| [122] | 1 | |
|---|
| [127] | 2 | import test_appliance, sys, StringIO |
|---|
| [122] | 3 | |
|---|
| 4 | from yaml import * |
|---|
| [129] | 5 | import yaml |
|---|
| [122] | 6 | |
|---|
| [132] | 7 | class TestEmitter(test_appliance.TestAppliance): |
|---|
| [122] | 8 | |
|---|
| [132] | 9 | def _testEmitterOnData(self, test_name, canonical_filename, data_filename): |
|---|
| 10 | self._testEmitter(test_name, data_filename) |
|---|
| 11 | |
|---|
| 12 | def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename): |
|---|
| 13 | self._testEmitter(test_name, canonical_filename, False) |
|---|
| 14 | |
|---|
| 15 | def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename): |
|---|
| 16 | self._testEmitter(test_name, canonical_filename, True) |
|---|
| 17 | |
|---|
| 18 | def _testEmitter(self, test_name, filename, canonical=None): |
|---|
| [136] | 19 | events = list(parse(file(filename, 'rb'))) |
|---|
| 20 | #self._dump(filename, events, canonical) |
|---|
| 21 | stream = StringIO.StringIO() |
|---|
| 22 | emit(events, stream, canonical=canonical) |
|---|
| 23 | data = stream.getvalue() |
|---|
| [129] | 24 | new_events = list(parse(data)) |
|---|
| 25 | for event, new_event in zip(events, new_events): |
|---|
| 26 | self.failUnlessEqual(event.__class__, new_event.__class__) |
|---|
| [132] | 27 | if isinstance(event, NodeEvent): |
|---|
| 28 | self.failUnlessEqual(event.anchor, new_event.anchor) |
|---|
| 29 | if isinstance(event, CollectionStartEvent): |
|---|
| 30 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 31 | if isinstance(event, ScalarEvent): |
|---|
| 32 | #self.failUnlessEqual(event.implicit, new_event.implicit) |
|---|
| 33 | if not event.implicit and not new_event.implicit: |
|---|
| 34 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 35 | self.failUnlessEqual(event.value, new_event.value) |
|---|
| [122] | 36 | |
|---|
| [136] | 37 | def _dump(self, filename, events, canonical): |
|---|
| [132] | 38 | print "="*30 |
|---|
| 39 | print "ORIGINAL DOCUMENT:" |
|---|
| 40 | print file(filename, 'rb').read() |
|---|
| 41 | print '-'*30 |
|---|
| 42 | print "EMITTED DOCUMENT:" |
|---|
| [136] | 43 | emit(events, sys.stdout, canonical=canonical) |
|---|
| [132] | 44 | |
|---|
| 45 | TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') |
|---|
| [136] | 46 | TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') |
|---|
| 47 | TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') |
|---|
| [122] | 48 | |
|---|
| [136] | 49 | class EventsLoader(Loader): |
|---|
| [129] | 50 | |
|---|
| 51 | def construct_event(self, node): |
|---|
| 52 | if isinstance(node, ScalarNode): |
|---|
| 53 | mapping = {} |
|---|
| 54 | else: |
|---|
| 55 | mapping = self.construct_mapping(node) |
|---|
| 56 | class_name = str(node.tag[1:])+'Event' |
|---|
| [130] | 57 | if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
|---|
| [129] | 58 | mapping.setdefault('anchor', None) |
|---|
| [130] | 59 | if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
|---|
| [129] | 60 | mapping.setdefault('tag', None) |
|---|
| 61 | if class_name == 'ScalarEvent': |
|---|
| [136] | 62 | mapping.setdefault('implicit', False) |
|---|
| [129] | 63 | mapping.setdefault('value', '') |
|---|
| 64 | value = getattr(yaml, class_name)(**mapping) |
|---|
| 65 | return value |
|---|
| 66 | |
|---|
| [136] | 67 | EventsLoader.add_constructor(None, EventsLoader.construct_event) |
|---|
| [129] | 68 | |
|---|
| [132] | 69 | class TestEmitterEvents(test_appliance.TestAppliance): |
|---|
| [129] | 70 | |
|---|
| [132] | 71 | def _testEmitterEvents(self, test_name, events_filename): |
|---|
| [136] | 72 | events = list(load(file(events_filename, 'rb'), Loader=EventsLoader)) |
|---|
| [132] | 73 | #self._dump(events_filename, events) |
|---|
| [136] | 74 | stream = StringIO.StringIO() |
|---|
| 75 | emit(events, stream) |
|---|
| 76 | data = stream.getvalue() |
|---|
| [129] | 77 | new_events = list(parse(data)) |
|---|
| 78 | self.failUnlessEqual(len(events), len(new_events)) |
|---|
| 79 | for event, new_event in zip(events, new_events): |
|---|
| 80 | self.failUnlessEqual(event.__class__, new_event.__class__) |
|---|
| [132] | 81 | if isinstance(event, NodeEvent): |
|---|
| 82 | self.failUnlessEqual(event.anchor, new_event.anchor) |
|---|
| 83 | if isinstance(event, CollectionStartEvent): |
|---|
| 84 | self.failUnlessEqual(event.tag, new_event.tag) |
|---|
| 85 | if isinstance(event, ScalarEvent): |
|---|
| 86 | self.failUnless(event.implicit == new_event.implicit |
|---|
| 87 | or event.tag == new_event.tag) |
|---|
| 88 | self.failUnlessEqual(event.value, new_event.value) |
|---|
| [129] | 89 | |
|---|
| 90 | def _dump(self, events_filename, events): |
|---|
| 91 | print "="*30 |
|---|
| 92 | print "EVENTS:" |
|---|
| 93 | print file(events_filename, 'rb').read() |
|---|
| 94 | print '-'*30 |
|---|
| 95 | print "OUTPUT:" |
|---|
| [136] | 96 | emit(events, sys.stdout) |
|---|
| [129] | 97 | |
|---|
| [132] | 98 | TestEmitterEvents.add_tests('testEmitterEvents', '.events') |
|---|
| [129] | 99 | |
|---|