source: pyyaml/trunk/tests/test_emitter.py @ 133

Revision 133, 4.4 KB checked in by xi, 8 years ago (diff)

Implement yaml.dump().

Line 
1
2import test_appliance, sys, StringIO
3
4from yaml import *
5import yaml
6
7class TestEmitter(test_appliance.TestAppliance):
8
9    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
10        self._testEmitter(test_name, data_filename)
11
12    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
13        self._testEmitter(test_name, canonical_filename, False)
14
15    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
16        self._testEmitter(test_name, canonical_filename, True)
17
18    def _testEmitter(self, test_name, filename, canonical=None):
19        events = list(iter(Parser(Scanner(Reader(file(filename, 'rb'))))))
20        if canonical is not None:
21            events[0].canonical = canonical
22        #self._dump(filename, events)
23        writer = StringIO.StringIO()
24        emitter = Emitter(writer)
25        for event in events:
26            emitter.emit(event)
27        data = writer.getvalue()
28        new_events = list(parse(data))
29        for event, new_event in zip(events, new_events):
30            self.failUnlessEqual(event.__class__, new_event.__class__)
31            if isinstance(event, NodeEvent):
32                self.failUnlessEqual(event.anchor, new_event.anchor)
33            if isinstance(event, CollectionStartEvent):
34                self.failUnlessEqual(event.tag, new_event.tag)
35            if isinstance(event, ScalarEvent):
36                #self.failUnlessEqual(event.implicit, new_event.implicit)
37                if not event.implicit and not new_event.implicit:
38                    self.failUnlessEqual(event.tag, new_event.tag)
39                self.failUnlessEqual(event.value, new_event.value)
40
41    def _dump(self, filename, events):
42        writer = sys.stdout
43        emitter = Emitter(writer)
44        print "="*30
45        print "ORIGINAL DOCUMENT:"
46        print file(filename, 'rb').read()
47        print '-'*30
48        print "EMITTED DOCUMENT:"
49        for event in events:
50            emitter.emit(event)
51       
52TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
53#TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
54#TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
55
56class EventsConstructor(Constructor):
57
58    def construct_event(self, node):
59        if isinstance(node, ScalarNode):
60            mapping = {}
61        else:
62            mapping = self.construct_mapping(node)
63        class_name = str(node.tag[1:])+'Event'
64        if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
65            mapping.setdefault('anchor', None)
66        if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
67            mapping.setdefault('tag', None)
68        if class_name == 'ScalarEvent':
69            mapping.setdefault('value', '')
70        value = getattr(yaml, class_name)(**mapping)
71        return value
72
73EventsConstructor.add_constructor(None, EventsConstructor.construct_event)
74
75class TestEmitterEvents(test_appliance.TestAppliance):
76
77    def _testEmitterEvents(self, test_name, events_filename):
78        events = list(load(file(events_filename, 'rb'), Constructor=EventsConstructor))
79        #self._dump(events_filename, events)
80        writer = StringIO.StringIO()
81        emitter = Emitter(writer)
82        for event in events:
83            emitter.emit(event)
84        data = writer.getvalue()
85        new_events = list(parse(data))
86        self.failUnlessEqual(len(events), len(new_events))
87        for event, new_event in zip(events, new_events):
88            self.failUnlessEqual(event.__class__, new_event.__class__)
89            if isinstance(event, NodeEvent):
90                self.failUnlessEqual(event.anchor, new_event.anchor)
91            if isinstance(event, CollectionStartEvent):
92                self.failUnlessEqual(event.tag, new_event.tag)
93            if isinstance(event, ScalarEvent):
94                self.failUnless(event.implicit == new_event.implicit
95                        or event.tag == new_event.tag)
96                self.failUnlessEqual(event.value, new_event.value)
97
98    def _dump(self, events_filename, events):
99        writer = sys.stdout
100        emitter = Emitter(writer)
101        print "="*30
102        print "EVENTS:"
103        print file(events_filename, 'rb').read()
104        print '-'*30
105        print "OUTPUT:"
106        for event in events:
107            emitter.emit(event)
108       
109TestEmitterEvents.add_tests('testEmitterEvents', '.events')
110
Note: See TracBrowser for help on using the repository browser.