source: pyyaml/trunk/tests/lib/test_yaml_ext.py @ 333

Revision 333, 10.4 KB checked in by xi, 6 years ago (diff)

Handle the encoding of input and output streams in a uniform way.

RevLine 
[195]1
2import _yaml, yaml
[322]3import types, pprint
[195]4
[322]5yaml.PyBaseLoader = yaml.BaseLoader
6yaml.PySafeLoader = yaml.SafeLoader
7yaml.PyLoader = yaml.Loader
8yaml.PyBaseDumper = yaml.BaseDumper
9yaml.PySafeDumper = yaml.SafeDumper
10yaml.PyDumper = yaml.Dumper
[312]11
[223]12old_scan = yaml.scan
[322]13def new_scan(stream, Loader=yaml.CLoader):
[223]14    return old_scan(stream, Loader)
[322]15
[223]16old_parse = yaml.parse
[322]17def new_parse(stream, Loader=yaml.CLoader):
[223]18    return old_parse(stream, Loader)
[322]19
[223]20old_compose = yaml.compose
[322]21def new_compose(stream, Loader=yaml.CLoader):
[223]22    return old_compose(stream, Loader)
[322]23
[223]24old_compose_all = yaml.compose_all
[322]25def new_compose_all(stream, Loader=yaml.CLoader):
[223]26    return old_compose_all(stream, Loader)
[322]27
28old_load = yaml.load
29def new_load(stream, Loader=yaml.CLoader):
30    return old_load(stream, Loader)
31
[223]32old_load_all = yaml.load_all
[322]33def new_load_all(stream, Loader=yaml.CLoader):
[223]34    return old_load_all(stream, Loader)
[322]35
36old_safe_load = yaml.safe_load
37def new_safe_load(stream):
38    return old_load(stream, yaml.CSafeLoader)
39
40old_safe_load_all = yaml.safe_load_all
41def new_safe_load_all(stream):
42    return old_load_all(stream, yaml.CSafeLoader)
43
[223]44old_emit = yaml.emit
[322]45def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
[223]46    return old_emit(events, stream, Dumper, **kwds)
[322]47
48old_serialize = yaml.serialize
49def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
50    return old_serialize(node, stream, Dumper, **kwds)
51
[223]52old_serialize_all = yaml.serialize_all
[322]53def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
[223]54    return old_serialize_all(nodes, stream, Dumper, **kwds)
[322]55
[223]56old_dump = yaml.dump
[322]57def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
[223]58    return old_dump(data, stream, Dumper, **kwds)
59
[322]60old_dump_all = yaml.dump_all
61def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
62    return old_dump_all(documents, stream, Dumper, **kwds)
[223]63
[322]64old_safe_dump = yaml.safe_dump
65def new_safe_dump(data, stream=None, **kwds):
66    return old_dump(data, stream, yaml.CSafeDumper, **kwds)
[195]67
[322]68old_safe_dump_all = yaml.safe_dump_all
69def new_safe_dump_all(documents, stream=None, **kwds):
70    return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
71
72def _set_up():
73    yaml.BaseLoader = yaml.CBaseLoader
74    yaml.SafeLoader = yaml.CSafeLoader
75    yaml.Loader = yaml.CLoader
76    yaml.BaseDumper = yaml.CBaseDumper
77    yaml.SafeDumper = yaml.CSafeDumper
78    yaml.Dumper = yaml.CDumper
79    yaml.scan = new_scan
80    yaml.parse = new_parse
81    yaml.compose = new_compose
82    yaml.compose_all = new_compose_all
83    yaml.load = new_load
84    yaml.load_all = new_load_all
85    yaml.safe_load = new_safe_load
86    yaml.safe_load_all = new_safe_load_all
87    yaml.emit = new_emit
88    yaml.serialize = new_serialize
89    yaml.serialize_all = new_serialize_all
90    yaml.dump = new_dump
91    yaml.dump_all = new_dump_all
92    yaml.safe_dump = new_safe_dump
93    yaml.safe_dump_all = new_safe_dump_all
94
95def _tear_down():
96    yaml.BaseLoader = yaml.PyBaseLoader
97    yaml.SafeLoader = yaml.PySafeLoader
98    yaml.Loader = yaml.PyLoader
99    yaml.BaseDumper = yaml.PyBaseDumper
100    yaml.SafeDumper = yaml.PySafeDumper
101    yaml.Dumper = yaml.PyDumper
102    yaml.scan = old_scan
103    yaml.parse = old_parse
104    yaml.compose = old_compose
105    yaml.compose_all = old_compose_all
106    yaml.load = old_load
107    yaml.load_all = old_load_all
108    yaml.safe_load = old_safe_load
109    yaml.safe_load_all = old_safe_load_all
110    yaml.emit = old_emit
111    yaml.serialize = old_serialize
112    yaml.serialize_all = old_serialize_all
113    yaml.dump = old_dump
114    yaml.dump_all = old_dump_all
115    yaml.safe_dump = old_safe_dump
116    yaml.safe_dump_all = old_safe_dump_all
117
118def test_c_version(verbose=False):
119    if verbose:
120        print _yaml.get_version()
121        print _yaml.get_version_string()
122    assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(),    \
123            (_yaml.get_version(), _yaml.get_version_string())
124
125def _compare_scanners(py_data, c_data, verbose):
126    py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
127    c_tokens = []
128    try:
129        for token in yaml.scan(c_data, Loader=yaml.CLoader):
130            c_tokens.append(token)
131        assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
132        for py_token, c_token in zip(py_tokens, c_tokens):
133            assert py_token.__class__ == c_token.__class__, (py_token, c_token)
134            if hasattr(py_token, 'value'):
135                assert py_token.value == c_token.value, (py_token, c_token)
136            if isinstance(py_token, yaml.StreamEndToken):
137                continue
138            py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
139            py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
140            c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
141            c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
142            assert py_start == c_start, (py_start, c_start)
143            assert py_end == c_end, (py_end, c_end)
144    finally:
145        if verbose:
146            print "PY_TOKENS:"
147            pprint.pprint(py_tokens)
148            print "C_TOKENS:"
149            pprint.pprint(c_tokens)
150
151def test_c_scanner(data_filename, canonical_filename, verbose=False):
152    _compare_scanners(open(data_filename, 'rb'),
153            open(data_filename, 'rb'), verbose)
154    _compare_scanners(open(data_filename, 'rb').read(),
155            open(data_filename, 'rb').read(), verbose)
156    _compare_scanners(open(canonical_filename, 'rb'),
157            open(canonical_filename, 'rb'), verbose)
158    _compare_scanners(open(canonical_filename, 'rb').read(),
159            open(canonical_filename, 'rb').read(), verbose)
160
161test_c_scanner.unittest = ['.data', '.canonical']
162test_c_scanner.skip = ['.skip-ext']
163
164def _compare_parsers(py_data, c_data, verbose):
165    py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
166    c_events = []
167    try:
168        for event in yaml.parse(c_data, Loader=yaml.CLoader):
169            c_events.append(event)
170        assert len(py_events) == len(c_events), (len(py_events), len(c_events))
171        for py_event, c_event in zip(py_events, c_events):
172            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
173                                'value', 'explicit', 'version', 'tags']:
174                py_value = getattr(py_event, attribute, None)
175                c_value = getattr(c_event, attribute, None)
176                assert py_value == c_value, (py_event, c_event, attribute)
177    finally:
178        if verbose:
179            print "PY_EVENTS:"
180            pprint.pprint(py_events)
181            print "C_EVENTS:"
182            pprint.pprint(c_events)
183
184def test_c_parser(data_filename, canonical_filename, verbose=False):
185    _compare_parsers(open(data_filename, 'rb'),
186            open(data_filename, 'rb'), verbose)
187    _compare_parsers(open(data_filename, 'rb').read(),
188            open(data_filename, 'rb').read(), verbose)
189    _compare_parsers(open(canonical_filename, 'rb'),
190            open(canonical_filename, 'rb'), verbose)
191    _compare_parsers(open(canonical_filename, 'rb').read(),
192            open(canonical_filename, 'rb').read(), verbose)
193
194test_c_parser.unittest = ['.data', '.canonical']
195test_c_parser.skip = ['.skip-ext']
196
197def _compare_emitters(data, verbose):
198    events = list(yaml.parse(data, Loader=yaml.PyLoader))
199    c_data = yaml.emit(events, Dumper=yaml.CDumper)
200    if verbose:
201        print c_data
202    py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
203    c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
204    try:
205        assert len(events) == len(py_events), (len(events), len(py_events))
206        assert len(events) == len(c_events), (len(events), len(c_events))
207        for event, py_event, c_event in zip(events, py_events, c_events):
208            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
209                                'value', 'explicit', 'version', 'tags']:
210                value = getattr(event, attribute, None)
211                py_value = getattr(py_event, attribute, None)
212                c_value = getattr(c_event, attribute, None)
213                if attribute == 'tag' and value in [None, u'!'] \
214                        and py_value in [None, u'!'] and c_value in [None, u'!']:
215                    continue
216                if attribute == 'explicit' and (py_value or c_value):
217                    continue
218                assert value == py_value, (event, py_event, attribute)
219                assert value == c_value, (event, c_event, attribute)
220    finally:
221        if verbose:
222            print "EVENTS:"
223            pprint.pprint(events)
224            print "PY_EVENTS:"
225            pprint.pprint(py_events)
226            print "C_EVENTS:"
227            pprint.pprint(c_events)
228
229def test_c_emitter(data_filename, canonical_filename, verbose=False):
230    _compare_emitters(open(data_filename, 'rb').read(), verbose)
231    _compare_emitters(open(canonical_filename, 'rb').read(), verbose)
232
233test_c_emitter.unittest = ['.data', '.canonical']
234test_c_emitter.skip = ['.skip-ext']
235
236def wrap_ext_function(function):
237    def wrapper(*args, **kwds):
238        _set_up()
239        try:
240            function(*args, **kwds)
241        finally:
242            _tear_down()
[325]243    try:
244        wrapper.func_name = '%s_ext' % function.func_name
245    except TypeError:
246        pass
247    wrapper.unittest_name = '%s_ext' % function.func_name
[322]248    wrapper.unittest = function.unittest
249    wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
250    return wrapper
251
252def wrap_ext(collections):
253    functions = []
254    if not isinstance(collections, list):
255        collections = [collections]
256    for collection in collections:
257        if not isinstance(collection, dict):
258            collection = vars(collection)
259        keys = collection.keys()
260        keys.sort()
261        for key in keys:
262            value = collection[key]
263            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
264                functions.append(wrap_ext_function(value))
265    for function in functions:
[325]266        assert function.unittest_name not in globals()
267        globals()[function.unittest_name] = function
[322]268
269import test_tokens, test_structure, test_errors, test_resolver, test_constructor,   \
[333]270        test_emitter, test_representer, test_recursive, test_input_output
[322]271wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
[333]272        test_emitter, test_representer, test_recursive, test_input_output])
[322]273
[195]274if __name__ == '__main__':
[322]275    import test_appliance
276    test_appliance.run(globals())
[195]277
Note: See TracBrowser for help on using the repository browser.