source: pyyaml/trunk/tests/test_emitter.py @ 137

Revision 137, 4.2 KB checked in by xi, 9 years ago (diff)

Refactor resolver.

RevLine 
[122]1
[127]2import test_appliance, sys, StringIO
[122]3
4from yaml import *
[129]5import yaml
[122]6
[132]7class TestEmitter(test_appliance.TestAppliance):
[122]8
[132]9    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
10        self._testEmitter(test_name, data_filename)
11
12    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
13        self._testEmitter(test_name, canonical_filename, False)
14
15    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
16        self._testEmitter(test_name, canonical_filename, True)
17
18    def _testEmitter(self, test_name, filename, canonical=None):
[136]19        events = list(parse(file(filename, 'rb')))
20        #self._dump(filename, events, canonical)
21        stream = StringIO.StringIO()
22        emit(events, stream, canonical=canonical)
23        data = stream.getvalue()
[129]24        new_events = list(parse(data))
25        for event, new_event in zip(events, new_events):
26            self.failUnlessEqual(event.__class__, new_event.__class__)
[132]27            if isinstance(event, NodeEvent):
28                self.failUnlessEqual(event.anchor, new_event.anchor)
29            if isinstance(event, CollectionStartEvent):
30                self.failUnlessEqual(event.tag, new_event.tag)
31            if isinstance(event, ScalarEvent):
32                #self.failUnlessEqual(event.implicit, new_event.implicit)
[137]33                if True not in event.implicit+new_event.implicit:
[132]34                    self.failUnlessEqual(event.tag, new_event.tag)
35                self.failUnlessEqual(event.value, new_event.value)
[122]36
[136]37    def _dump(self, filename, events, canonical):
[132]38        print "="*30
39        print "ORIGINAL DOCUMENT:"
40        print file(filename, 'rb').read()
41        print '-'*30
42        print "EMITTED DOCUMENT:"
[136]43        emit(events, sys.stdout, canonical=canonical)
[132]44       
45TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
[136]46TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
47TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
[122]48
[136]49class EventsLoader(Loader):
[129]50
51    def construct_event(self, node):
52        if isinstance(node, ScalarNode):
53            mapping = {}
54        else:
55            mapping = self.construct_mapping(node)
56        class_name = str(node.tag[1:])+'Event'
[130]57        if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
[129]58            mapping.setdefault('anchor', None)
[130]59        if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
[129]60            mapping.setdefault('tag', None)
[137]61        if class_name in ['SequenceStartEvent', 'MappingStartEvent']:
62            mapping.setdefault('implicit', True)
[129]63        if class_name == 'ScalarEvent':
[137]64            mapping.setdefault('implicit', (False, True))
[129]65            mapping.setdefault('value', '')
66        value = getattr(yaml, class_name)(**mapping)
67        return value
68
[136]69EventsLoader.add_constructor(None, EventsLoader.construct_event)
[129]70
[132]71class TestEmitterEvents(test_appliance.TestAppliance):
[129]72
[132]73    def _testEmitterEvents(self, test_name, events_filename):
[136]74        events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
[132]75        #self._dump(events_filename, events)
[136]76        stream = StringIO.StringIO()
77        emit(events, stream)
78        data = stream.getvalue()
[129]79        new_events = list(parse(data))
80        self.failUnlessEqual(len(events), len(new_events))
81        for event, new_event in zip(events, new_events):
82            self.failUnlessEqual(event.__class__, new_event.__class__)
[132]83            if isinstance(event, NodeEvent):
84                self.failUnlessEqual(event.anchor, new_event.anchor)
85            if isinstance(event, CollectionStartEvent):
86                self.failUnlessEqual(event.tag, new_event.tag)
87            if isinstance(event, ScalarEvent):
88                self.failUnless(event.implicit == new_event.implicit
89                        or event.tag == new_event.tag)
90                self.failUnlessEqual(event.value, new_event.value)
[129]91
92    def _dump(self, events_filename, events):
93        print "="*30
94        print "EVENTS:"
95        print file(events_filename, 'rb').read()
96        print '-'*30
97        print "OUTPUT:"
[136]98        emit(events, sys.stdout)
[129]99       
[132]100TestEmitterEvents.add_tests('testEmitterEvents', '.events')
[129]101
Note: See TracBrowser for help on using the repository browser.