| 1 | |
|---|
| 2 | import test_appliance, sys, StringIO |
|---|
| 3 | |
|---|
| 4 | from yaml import * |
|---|
| 5 | import yaml |
|---|
| 6 | |
|---|
| 7 | class TestEmitter(test_appliance.TestAppliance): |
|---|
| 8 | |
|---|
| 9 | def _testEmitterOnData(self, test_name, canonical_filename, data_filename): |
|---|
| 10 | self._testEmitter(test_name, data_filename) |
|---|
| 11 | |
|---|
| 12 | def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename): |
|---|
| 13 | self._testEmitter(test_name, canonical_filename, False) |
|---|
| 14 | |
|---|
| 15 | def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename): |
|---|
| 16 | self._testEmitter(test_name, canonical_filename, True) |
|---|
| 17 | |
|---|
| 18 | def _testEmitter(self, test_name, filename, canonical=None): |
|---|
| 19 | events = list(iter(Parser(Scanner(Reader(file(filename, 'rb')))))) |
|---|
| 20 | if canonical is not None: |
|---|
| 21 | events[0].canonical = canonical |
|---|
| 22 | #self._dump(filename, events) |
|---|
| 23 | writer = StringIO.StringIO() |
|---|
| 24 | emitter = Emitter(writer) |
|---|
| 25 | for event in events: |
|---|
| 26 | emitter.emit(event) |
|---|
| 27 | data = writer.getvalue() |
|---|
| 28 | new_events = list(parse(data)) |
|---|
| 29 | for event, new_event in zip(events, new_events): |
|---|
| 30 | self.failUnlessEqual(event.__class__, new_event.__class__) |
|---|
| 31 | if isinstance(event, NodeEvent): |
|---|
| 32 | self.failUnlessEqual(event.anchor, new_event.anchor) |
|---|
| 33 | if isinstance(event, CollectionStartEvent): |
|---|
| 34 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 35 | if isinstance(event, ScalarEvent): |
|---|
| 36 | #self.failUnlessEqual(event.implicit, new_event.implicit) |
|---|
| 37 | if not event.implicit and not new_event.implicit: |
|---|
| 38 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 39 | self.failUnlessEqual(event.value, new_event.value) |
|---|
| 40 | |
|---|
| 41 | def _dump(self, filename, events): |
|---|
| 42 | writer = sys.stdout |
|---|
| 43 | emitter = Emitter(writer) |
|---|
| 44 | print "="*30 |
|---|
| 45 | print "ORIGINAL DOCUMENT:" |
|---|
| 46 | print file(filename, 'rb').read() |
|---|
| 47 | print '-'*30 |
|---|
| 48 | print "EMITTED DOCUMENT:" |
|---|
| 49 | for event in events: |
|---|
| 50 | emitter.emit(event) |
|---|
| 51 | |
|---|
| 52 | TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') |
|---|
| 53 | #TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') |
|---|
| 54 | #TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') |
|---|
| 55 | |
|---|
| 56 | class EventsConstructor(Constructor): |
|---|
| 57 | |
|---|
| 58 | def construct_event(self, node): |
|---|
| 59 | if isinstance(node, ScalarNode): |
|---|
| 60 | mapping = {} |
|---|
| 61 | else: |
|---|
| 62 | mapping = self.construct_mapping(node) |
|---|
| 63 | class_name = str(node.tag[1:])+'Event' |
|---|
| 64 | if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
|---|
| 65 | mapping.setdefault('anchor', None) |
|---|
| 66 | if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
|---|
| 67 | mapping.setdefault('tag', None) |
|---|
| 68 | if class_name == 'ScalarEvent': |
|---|
| 69 | mapping.setdefault('value', '') |
|---|
| 70 | value = getattr(yaml, class_name)(**mapping) |
|---|
| 71 | return value |
|---|
| 72 | |
|---|
| 73 | EventsConstructor.add_constructor(None, EventsConstructor.construct_event) |
|---|
| 74 | |
|---|
| 75 | class TestEmitterEvents(test_appliance.TestAppliance): |
|---|
| 76 | |
|---|
| 77 | def _testEmitterEvents(self, test_name, events_filename): |
|---|
| 78 | events = list(load_document(file(events_filename, 'rb'), Constructor=EventsConstructor)) |
|---|
| 79 | #self._dump(events_filename, events) |
|---|
| 80 | writer = StringIO.StringIO() |
|---|
| 81 | emitter = Emitter(writer) |
|---|
| 82 | for event in events: |
|---|
| 83 | emitter.emit(event) |
|---|
| 84 | data = writer.getvalue() |
|---|
| 85 | new_events = list(parse(data)) |
|---|
| 86 | self.failUnlessEqual(len(events), len(new_events)) |
|---|
| 87 | for event, new_event in zip(events, new_events): |
|---|
| 88 | self.failUnlessEqual(event.__class__, new_event.__class__) |
|---|
| 89 | if isinstance(event, NodeEvent): |
|---|
| 90 | self.failUnlessEqual(event.anchor, new_event.anchor) |
|---|
| 91 | if isinstance(event, CollectionStartEvent): |
|---|
| 92 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 93 | if isinstance(event, ScalarEvent): |
|---|
| 94 | self.failUnless(event.implicit == new_event.implicit |
|---|
| 95 | or event.tag == new_event.tag) |
|---|
| 96 | self.failUnlessEqual(event.value, new_event.value) |
|---|
| 97 | |
|---|
| 98 | def _dump(self, events_filename, events): |
|---|
| 99 | writer = sys.stdout |
|---|
| 100 | emitter = Emitter(writer) |
|---|
| 101 | print "="*30 |
|---|
| 102 | print "EVENTS:" |
|---|
| 103 | print file(events_filename, 'rb').read() |
|---|
| 104 | print '-'*30 |
|---|
| 105 | print "OUTPUT:" |
|---|
| 106 | for event in events: |
|---|
| 107 | emitter.emit(event) |
|---|
| 108 | |
|---|
| 109 | TestEmitterEvents.add_tests('testEmitterEvents', '.events') |
|---|
| 110 | |
|---|