source: pyyaml/trunk/tests/test_emitter.py @ 136

Revision 136, 4.0 KB checked in by xi, 8 years ago (diff)

Major refactoring.

RevLine 
[122]1
[127]2import test_appliance, sys, StringIO
[122]3
4from yaml import *
[129]5import yaml
[122]6
[132]7class TestEmitter(test_appliance.TestAppliance):
[122]8
[132]9    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
10        self._testEmitter(test_name, data_filename)
11
12    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
13        self._testEmitter(test_name, canonical_filename, False)
14
15    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
16        self._testEmitter(test_name, canonical_filename, True)
17
18    def _testEmitter(self, test_name, filename, canonical=None):
[136]19        events = list(parse(file(filename, 'rb')))
20        #self._dump(filename, events, canonical)
21        stream = StringIO.StringIO()
22        emit(events, stream, canonical=canonical)
23        data = stream.getvalue()
[129]24        new_events = list(parse(data))
25        for event, new_event in zip(events, new_events):
26            self.failUnlessEqual(event.__class__, new_event.__class__)
[132]27            if isinstance(event, NodeEvent):
28                self.failUnlessEqual(event.anchor, new_event.anchor)
29            if isinstance(event, CollectionStartEvent):
30                self.failUnlessEqual(event.tag, new_event.tag)
31            if isinstance(event, ScalarEvent):
32                #self.failUnlessEqual(event.implicit, new_event.implicit)
33                if not event.implicit and not new_event.implicit:
34                    self.failUnlessEqual(event.tag, new_event.tag)
35                self.failUnlessEqual(event.value, new_event.value)
[122]36
[136]37    def _dump(self, filename, events, canonical):
[132]38        print "="*30
39        print "ORIGINAL DOCUMENT:"
40        print file(filename, 'rb').read()
41        print '-'*30
42        print "EMITTED DOCUMENT:"
[136]43        emit(events, sys.stdout, canonical=canonical)
[132]44       
45TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
[136]46TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
47TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
[122]48
[136]49class EventsLoader(Loader):
[129]50
51    def construct_event(self, node):
52        if isinstance(node, ScalarNode):
53            mapping = {}
54        else:
55            mapping = self.construct_mapping(node)
56        class_name = str(node.tag[1:])+'Event'
[130]57        if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
[129]58            mapping.setdefault('anchor', None)
[130]59        if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
[129]60            mapping.setdefault('tag', None)
61        if class_name == 'ScalarEvent':
[136]62            mapping.setdefault('implicit', False)
[129]63            mapping.setdefault('value', '')
64        value = getattr(yaml, class_name)(**mapping)
65        return value
66
[136]67EventsLoader.add_constructor(None, EventsLoader.construct_event)
[129]68
[132]69class TestEmitterEvents(test_appliance.TestAppliance):
[129]70
[132]71    def _testEmitterEvents(self, test_name, events_filename):
[136]72        events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
[132]73        #self._dump(events_filename, events)
[136]74        stream = StringIO.StringIO()
75        emit(events, stream)
76        data = stream.getvalue()
[129]77        new_events = list(parse(data))
78        self.failUnlessEqual(len(events), len(new_events))
79        for event, new_event in zip(events, new_events):
80            self.failUnlessEqual(event.__class__, new_event.__class__)
[132]81            if isinstance(event, NodeEvent):
82                self.failUnlessEqual(event.anchor, new_event.anchor)
83            if isinstance(event, CollectionStartEvent):
84                self.failUnlessEqual(event.tag, new_event.tag)
85            if isinstance(event, ScalarEvent):
86                self.failUnless(event.implicit == new_event.implicit
87                        or event.tag == new_event.tag)
88                self.failUnlessEqual(event.value, new_event.value)
[129]89
90    def _dump(self, events_filename, events):
91        print "="*30
92        print "EVENTS:"
93        print file(events_filename, 'rb').read()
94        print '-'*30
95        print "OUTPUT:"
[136]96        emit(events, sys.stdout)
[129]97       
[132]98TestEmitterEvents.add_tests('testEmitterEvents', '.events')
[129]99
Note: See TracBrowser for help on using the repository browser.