Changeset 322 for pyyaml/trunk/tests/test_yaml_ext.py
- Timestamp:
- 12/28/08 15:16:50 (4 years ago)
- File:
-
- 1 edited
-
pyyaml/trunk/tests/test_yaml_ext.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
pyyaml/trunk/tests/test_yaml_ext.py
r312 r322 1 2 import unittest, test_appliance3 1 4 2 import _yaml, yaml 5 6 test_appliance.TestAppliance.SKIP_EXT = '.skip-ext' 7 8 class TestCVersion(unittest.TestCase): 9 10 def testCVersion(self): 11 self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string()) 12 13 class TestCLoader(test_appliance.TestAppliance): 14 15 def _testCScannerFileInput(self, test_name, data_filename, canonical_filename): 16 self._testCScanner(test_name, data_filename, canonical_filename, True) 17 18 def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader): 19 if file_input: 20 data = file(data_filename, 'r') 21 else: 22 data = file(data_filename, 'r').read() 23 tokens = list(yaml.scan(data, Loader=Loader)) 24 ext_tokens = [] 3 import types, pprint 4 5 yaml.PyBaseLoader = yaml.BaseLoader 6 yaml.PySafeLoader = yaml.SafeLoader 7 yaml.PyLoader = yaml.Loader 8 yaml.PyBaseDumper = yaml.BaseDumper 9 yaml.PySafeDumper = yaml.SafeDumper 10 yaml.PyDumper = yaml.Dumper 11 12 old_scan = yaml.scan 13 def new_scan(stream, Loader=yaml.CLoader): 14 return old_scan(stream, Loader) 15 16 old_parse = yaml.parse 17 def new_parse(stream, Loader=yaml.CLoader): 18 return old_parse(stream, Loader) 19 20 old_compose = yaml.compose 21 def new_compose(stream, Loader=yaml.CLoader): 22 return old_compose(stream, Loader) 23 24 old_compose_all = yaml.compose_all 25 def new_compose_all(stream, Loader=yaml.CLoader): 26 return old_compose_all(stream, Loader) 27 28 old_load = yaml.load 29 def new_load(stream, Loader=yaml.CLoader): 30 return old_load(stream, Loader) 31 32 old_load_all = yaml.load_all 33 def new_load_all(stream, Loader=yaml.CLoader): 34 return old_load_all(stream, Loader) 35 36 old_safe_load = yaml.safe_load 37 def new_safe_load(stream): 38 return old_load(stream, yaml.CSafeLoader) 39 40 old_safe_load_all = yaml.safe_load_all 41 def new_safe_load_all(stream): 42 return old_load_all(stream, yaml.CSafeLoader) 43 44 old_emit = yaml.emit 45 def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds): 46 return old_emit(events, stream, Dumper, **kwds) 47 48 old_serialize = yaml.serialize 49 def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds): 50 return old_serialize(node, stream, Dumper, **kwds) 51 52 old_serialize_all = yaml.serialize_all 53 def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): 54 return old_serialize_all(nodes, stream, Dumper, **kwds) 55 56 old_dump = yaml.dump 57 def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds): 58 return old_dump(data, stream, Dumper, **kwds) 59 60 old_dump_all = yaml.dump_all 61 def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): 62 return old_dump_all(documents, stream, Dumper, **kwds) 63 64 old_safe_dump = yaml.safe_dump 65 def new_safe_dump(data, stream=None, **kwds): 66 return old_dump(data, stream, yaml.CSafeDumper, **kwds) 67 68 old_safe_dump_all = yaml.safe_dump_all 69 def new_safe_dump_all(documents, stream=None, **kwds): 70 return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds) 71 72 def _set_up(): 73 yaml.BaseLoader = yaml.CBaseLoader 74 yaml.SafeLoader = yaml.CSafeLoader 75 yaml.Loader = yaml.CLoader 76 yaml.BaseDumper = yaml.CBaseDumper 77 yaml.SafeDumper = yaml.CSafeDumper 78 yaml.Dumper = yaml.CDumper 79 yaml.scan = new_scan 80 yaml.parse = new_parse 81 yaml.compose = new_compose 82 yaml.compose_all = new_compose_all 83 yaml.load = new_load 84 yaml.load_all = new_load_all 85 yaml.safe_load = new_safe_load 86 yaml.safe_load_all = new_safe_load_all 87 yaml.emit = new_emit 88 yaml.serialize = new_serialize 89 yaml.serialize_all = new_serialize_all 90 yaml.dump = new_dump 91 yaml.dump_all = new_dump_all 92 yaml.safe_dump = new_safe_dump 93 yaml.safe_dump_all = new_safe_dump_all 94 95 def _tear_down(): 96 yaml.BaseLoader = yaml.PyBaseLoader 97 yaml.SafeLoader = yaml.PySafeLoader 98 yaml.Loader = yaml.PyLoader 99 yaml.BaseDumper = yaml.PyBaseDumper 100 yaml.SafeDumper = yaml.PySafeDumper 101 yaml.Dumper = yaml.PyDumper 102 yaml.scan = old_scan 103 yaml.parse = old_parse 104 yaml.compose = old_compose 105 yaml.compose_all = old_compose_all 106 yaml.load = old_load 107 yaml.load_all = old_load_all 108 yaml.safe_load = old_safe_load 109 yaml.safe_load_all = old_safe_load_all 110 yaml.emit = old_emit 111 yaml.serialize = old_serialize 112 yaml.serialize_all = old_serialize_all 113 yaml.dump = old_dump 114 yaml.dump_all = old_dump_all 115 yaml.safe_dump = old_safe_dump 116 yaml.safe_dump_all = old_safe_dump_all 117 118 def test_c_version(verbose=False): 119 if verbose: 120 print _yaml.get_version() 121 print _yaml.get_version_string() 122 assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \ 123 (_yaml.get_version(), _yaml.get_version_string()) 124 125 def _compare_scanners(py_data, c_data, verbose): 126 py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader)) 127 c_tokens = [] 128 try: 129 for token in yaml.scan(c_data, Loader=yaml.CLoader): 130 c_tokens.append(token) 131 assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) 132 for py_token, c_token in zip(py_tokens, c_tokens): 133 assert py_token.__class__ == c_token.__class__, (py_token, c_token) 134 if hasattr(py_token, 'value'): 135 assert py_token.value == c_token.value, (py_token, c_token) 136 if isinstance(py_token, yaml.StreamEndToken): 137 continue 138 py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column) 139 py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column) 140 c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column) 141 c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) 142 assert py_start == c_start, (py_start, c_start) 143 assert py_end == c_end, (py_end, c_end) 144 finally: 145 if verbose: 146 print "PY_TOKENS:" 147 pprint.pprint(py_tokens) 148 print "C_TOKENS:" 149 pprint.pprint(c_tokens) 150 151 def test_c_scanner(data_filename, canonical_filename, verbose=False): 152 _compare_scanners(open(data_filename, 'rb'), 153 open(data_filename, 'rb'), verbose) 154 _compare_scanners(open(data_filename, 'rb').read(), 155 open(data_filename, 'rb').read(), verbose) 156 _compare_scanners(open(canonical_filename, 'rb'), 157 open(canonical_filename, 'rb'), verbose) 158 _compare_scanners(open(canonical_filename, 'rb').read(), 159 open(canonical_filename, 'rb').read(), verbose) 160 161 test_c_scanner.unittest = ['.data', '.canonical'] 162 test_c_scanner.skip = ['.skip-ext'] 163 164 def _compare_parsers(py_data, c_data, verbose): 165 py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader)) 166 c_events = [] 167 try: 168 for event in yaml.parse(c_data, Loader=yaml.CLoader): 169 c_events.append(event) 170 assert len(py_events) == len(c_events), (len(py_events), len(c_events)) 171 for py_event, c_event in zip(py_events, c_events): 172 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 173 'value', 'explicit', 'version', 'tags']: 174 py_value = getattr(py_event, attribute, None) 175 c_value = getattr(c_event, attribute, None) 176 assert py_value == c_value, (py_event, c_event, attribute) 177 finally: 178 if verbose: 179 print "PY_EVENTS:" 180 pprint.pprint(py_events) 181 print "C_EVENTS:" 182 pprint.pprint(c_events) 183 184 def test_c_parser(data_filename, canonical_filename, verbose=False): 185 _compare_parsers(open(data_filename, 'rb'), 186 open(data_filename, 'rb'), verbose) 187 _compare_parsers(open(data_filename, 'rb').read(), 188 open(data_filename, 'rb').read(), verbose) 189 _compare_parsers(open(canonical_filename, 'rb'), 190 open(canonical_filename, 'rb'), verbose) 191 _compare_parsers(open(canonical_filename, 'rb').read(), 192 open(canonical_filename, 'rb').read(), verbose) 193 194 test_c_parser.unittest = ['.data', '.canonical'] 195 test_c_parser.skip = ['.skip-ext'] 196 197 def _compare_emitters(data, verbose): 198 events = list(yaml.parse(data, Loader=yaml.PyLoader)) 199 c_data = yaml.emit(events, Dumper=yaml.CDumper) 200 if verbose: 201 print c_data 202 py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader)) 203 c_events = list(yaml.parse(c_data, Loader=yaml.CLoader)) 204 try: 205 assert len(events) == len(py_events), (len(events), len(py_events)) 206 assert len(events) == len(c_events), (len(events), len(c_events)) 207 for event, py_event, c_event in zip(events, py_events, c_events): 208 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 209 'value', 'explicit', 'version', 'tags']: 210 value = getattr(event, attribute, None) 211 py_value = getattr(py_event, attribute, None) 212 c_value = getattr(c_event, attribute, None) 213 if attribute == 'tag' and value in [None, u'!'] \ 214 and py_value in [None, u'!'] and c_value in [None, u'!']: 215 continue 216 if attribute == 'explicit' and (py_value or c_value): 217 continue 218 assert value == py_value, (event, py_event, attribute) 219 assert value == c_value, (event, c_event, attribute) 220 finally: 221 if verbose: 222 print "EVENTS:" 223 pprint.pprint(events) 224 print "PY_EVENTS:" 225 pprint.pprint(py_events) 226 print "C_EVENTS:" 227 pprint.pprint(c_events) 228 229 def test_c_emitter(data_filename, canonical_filename, verbose=False): 230 _compare_emitters(open(data_filename, 'rb').read(), verbose) 231 _compare_emitters(open(canonical_filename, 'rb').read(), verbose) 232 233 test_c_emitter.unittest = ['.data', '.canonical'] 234 test_c_emitter.skip = ['.skip-ext'] 235 236 def wrap_ext_function(function): 237 def wrapper(*args, **kwds): 238 _set_up() 25 239 try: 26 if file_input: 27 data = file(data_filename, 'r') 28 for token in yaml.scan(data, Loader=yaml.CLoader): 29 ext_tokens.append(token) 30 self.failUnlessEqual(len(tokens), len(ext_tokens)) 31 for token, ext_token in zip(tokens, ext_tokens): 32 self.failUnlessEqual(token.__class__, ext_token.__class__) 33 if not isinstance(token, yaml.StreamEndToken): 34 self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column), 35 (ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column)) 36 self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column), 37 (ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column)) 38 if hasattr(token, 'value'): 39 self.failUnlessEqual(token.value, ext_token.value) 40 except: 41 print 42 print "DATA:" 43 print file(data_filename, 'rb').read() 44 print "TOKENS:", tokens 45 print "EXT_TOKENS:", ext_tokens 46 raise 47 48 def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): 49 data = file(data_filename, 'r').read() 50 events = list(yaml.parse(data, Loader=Loader)) 51 ext_events = [] 52 try: 53 for event in yaml.parse(data, Loader=yaml.CLoader): 54 ext_events.append(event) 55 #print "EVENT:", event 56 self.failUnlessEqual(len(events), len(ext_events)) 57 for event, ext_event in zip(events, ext_events): 58 self.failUnlessEqual(event.__class__, ext_event.__class__) 59 if hasattr(event, 'anchor'): 60 self.failUnlessEqual(event.anchor, ext_event.anchor) 61 if hasattr(event, 'tag'): 62 self.failUnlessEqual(event.tag, ext_event.tag) 63 if hasattr(event, 'implicit'): 64 self.failUnlessEqual(event.implicit, ext_event.implicit) 65 if hasattr(event, 'value'): 66 self.failUnlessEqual(event.value, ext_event.value) 67 if hasattr(event, 'explicit'): 68 self.failUnlessEqual(event.explicit, ext_event.explicit) 69 if hasattr(event, 'version'): 70 self.failUnlessEqual(event.version, ext_event.version) 71 if hasattr(event, 'tags'): 72 self.failUnlessEqual(event.tags, ext_event.tags) 73 except: 74 print 75 print "DATA:" 76 print file(data_filename, 'rb').read() 77 print "EVENTS:", events 78 print "EXT_EVENTS:", ext_events 79 raise 80 81 TestCLoader.add_tests('testCScanner', '.data', '.canonical') 82 TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical') 83 TestCLoader.add_tests('testCParser', '.data', '.canonical') 84 85 class TestCEmitter(test_appliance.TestAppliance): 86 87 def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): 88 data1 = file(data_filename, 'r').read() 89 events = list(yaml.parse(data1, Loader=Loader)) 90 data2 = yaml.emit(events, Dumper=yaml.CDumper) 91 ext_events = [] 92 try: 93 for event in yaml.parse(data2): 94 ext_events.append(event) 95 self.failUnlessEqual(len(events), len(ext_events)) 96 for event, ext_event in zip(events, ext_events): 97 self.failUnlessEqual(event.__class__, ext_event.__class__) 98 if hasattr(event, 'anchor'): 99 self.failUnlessEqual(event.anchor, ext_event.anchor) 100 if hasattr(event, 'tag'): 101 if not (event.tag in ['!', None] and ext_event.tag in ['!', None]): 102 self.failUnlessEqual(event.tag, ext_event.tag) 103 if hasattr(event, 'implicit'): 104 self.failUnlessEqual(event.implicit, ext_event.implicit) 105 if hasattr(event, 'value'): 106 self.failUnlessEqual(event.value, ext_event.value) 107 if hasattr(event, 'explicit') and event.explicit: 108 self.failUnlessEqual(event.explicit, ext_event.explicit) 109 if hasattr(event, 'version'): 110 self.failUnlessEqual(event.version, ext_event.version) 111 if hasattr(event, 'tags'): 112 self.failUnlessEqual(event.tags, ext_event.tags) 113 except: 114 print 115 print "DATA1:" 116 print data1 117 print "DATA2:" 118 print data2 119 print "EVENTS:", events 120 print "EXT_EVENTS:", ext_events 121 raise 122 123 TestCEmitter.add_tests('testCEmitter', '.data', '.canonical') 124 125 yaml.BaseLoader = yaml.CBaseLoader 126 yaml.SafeLoader = yaml.CSafeLoader 127 yaml.Loader = yaml.CLoader 128 yaml.BaseDumper = yaml.CBaseDumper 129 yaml.SafeDumper = yaml.CSafeDumper 130 yaml.Dumper = yaml.CDumper 131 old_scan = yaml.scan 132 def scan(stream, Loader=yaml.CLoader): 133 return old_scan(stream, Loader) 134 yaml.scan = scan 135 old_parse = yaml.parse 136 def parse(stream, Loader=yaml.CLoader): 137 return old_parse(stream, Loader) 138 yaml.parse = parse 139 old_compose = yaml.compose 140 def compose(stream, Loader=yaml.CLoader): 141 return old_compose(stream, Loader) 142 yaml.compose = compose 143 old_compose_all = yaml.compose_all 144 def compose_all(stream, Loader=yaml.CLoader): 145 return old_compose_all(stream, Loader) 146 yaml.compose_all = compose_all 147 old_load_all = yaml.load_all 148 def load_all(stream, Loader=yaml.CLoader): 149 return old_load_all(stream, Loader) 150 yaml.load_all = load_all 151 old_load = yaml.load 152 def load(stream, Loader=yaml.CLoader): 153 return old_load(stream, Loader) 154 yaml.load = load 155 def safe_load_all(stream): 156 return yaml.load_all(stream, yaml.CSafeLoader) 157 yaml.safe_load_all = safe_load_all 158 def safe_load(stream): 159 return yaml.load(stream, yaml.CSafeLoader) 160 yaml.safe_load = safe_load 161 old_emit = yaml.emit 162 def emit(events, stream=None, Dumper=yaml.CDumper, **kwds): 163 return old_emit(events, stream, Dumper, **kwds) 164 yaml.emit = emit 165 old_serialize_all = yaml.serialize_all 166 def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): 167 return old_serialize_all(nodes, stream, Dumper, **kwds) 168 yaml.serialize_all = serialize_all 169 old_serialize = yaml.serialize 170 def serialize(node, stream, Dumper=yaml.CDumper, **kwds): 171 return old_serialize(node, stream, Dumper, **kwds) 172 yaml.serialize = serialize 173 old_dump_all = yaml.dump_all 174 def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): 175 return old_dump_all(documents, stream, Dumper, **kwds) 176 yaml.dump_all = dump_all 177 old_dump = yaml.dump 178 def dump(data, stream=None, Dumper=yaml.CDumper, **kwds): 179 return old_dump(data, stream, Dumper, **kwds) 180 yaml.dump = dump 181 def safe_dump_all(documents, stream=None, **kwds): 182 return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds) 183 yaml.safe_dump_all = safe_dump_all 184 def safe_dump(data, stream=None, **kwds): 185 return yaml.dump(data, stream, yaml.CSafeDumper, **kwds) 186 yaml.safe_dump = safe_dump 187 188 from test_yaml import * 189 190 def main(module='__main__'): 191 unittest.main(module) 240 function(*args, **kwds) 241 finally: 242 _tear_down() 243 wrapper.func_name = '%s_ext' % function.func_name 244 wrapper.unittest = function.unittest 245 wrapper.skip = getattr(function, 'skip', [])+['.skip-ext'] 246 return wrapper 247 248 def wrap_ext(collections): 249 functions = [] 250 if not isinstance(collections, list): 251 collections = [collections] 252 for collection in collections: 253 if not isinstance(collection, dict): 254 collection = vars(collection) 255 keys = collection.keys() 256 keys.sort() 257 for key in keys: 258 value = collection[key] 259 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 260 functions.append(wrap_ext_function(value)) 261 for function in functions: 262 assert function.func_name not in globals() 263 globals()[function.func_name] = function 264 265 import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \ 266 test_emitter, test_representer, test_recursive 267 wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor, 268 test_emitter, test_representer, test_recursive]) 192 269 193 270 if __name__ == '__main__': 194 main() 195 271 import test_appliance 272 test_appliance.run(globals()) 273
Note: See TracChangeset
for help on using the changeset viewer.
