| [44] | 1 | |
|---|
| [322] | 2 | import yaml, canonical |
|---|
| 3 | import pprint |
|---|
| [44] | 4 | |
|---|
| [322] | 5 | def _convert_structure(loader): |
|---|
| 6 | if loader.check_event(yaml.ScalarEvent): |
|---|
| 7 | event = loader.get_event() |
|---|
| 8 | if event.tag or event.anchor or event.value: |
|---|
| 9 | return True |
|---|
| [44] | 10 | else: |
|---|
| [322] | 11 | return None |
|---|
| 12 | elif loader.check_event(yaml.SequenceStartEvent): |
|---|
| 13 | loader.get_event() |
|---|
| 14 | sequence = [] |
|---|
| 15 | while not loader.check_event(yaml.SequenceEndEvent): |
|---|
| 16 | sequence.append(_convert_structure(loader)) |
|---|
| 17 | loader.get_event() |
|---|
| 18 | return sequence |
|---|
| 19 | elif loader.check_event(yaml.MappingStartEvent): |
|---|
| 20 | loader.get_event() |
|---|
| 21 | mapping = [] |
|---|
| 22 | while not loader.check_event(yaml.MappingEndEvent): |
|---|
| 23 | key = _convert_structure(loader) |
|---|
| 24 | value = _convert_structure(loader) |
|---|
| 25 | mapping.append((key, value)) |
|---|
| 26 | loader.get_event() |
|---|
| 27 | return mapping |
|---|
| 28 | elif loader.check_event(yaml.AliasEvent): |
|---|
| 29 | loader.get_event() |
|---|
| 30 | return '*' |
|---|
| 31 | else: |
|---|
| 32 | loader.get_event() |
|---|
| 33 | return '?' |
|---|
| [44] | 34 | |
|---|
| [322] | 35 | def test_structure(data_filename, structure_filename, verbose=False): |
|---|
| 36 | nodes1 = [] |
|---|
| [328] | 37 | nodes2 = eval(open(structure_filename, 'r').read()) |
|---|
| [322] | 38 | try: |
|---|
| 39 | loader = yaml.Loader(open(data_filename, 'rb')) |
|---|
| 40 | while loader.check_event(): |
|---|
| 41 | if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, |
|---|
| 42 | yaml.DocumentStartEvent, yaml.DocumentEndEvent): |
|---|
| 43 | loader.get_event() |
|---|
| 44 | continue |
|---|
| 45 | nodes1.append(_convert_structure(loader)) |
|---|
| 46 | if len(nodes1) == 1: |
|---|
| 47 | nodes1 = nodes1[0] |
|---|
| 48 | assert nodes1 == nodes2, (nodes1, nodes2) |
|---|
| 49 | finally: |
|---|
| 50 | if verbose: |
|---|
| [328] | 51 | print("NODES1:") |
|---|
| [322] | 52 | pprint.pprint(nodes1) |
|---|
| [328] | 53 | print("NODES2:") |
|---|
| [322] | 54 | pprint.pprint(nodes2) |
|---|
| [44] | 55 | |
|---|
| [322] | 56 | test_structure.unittest = ['.data', '.structure'] |
|---|
| [44] | 57 | |
|---|
| [322] | 58 | def _compare_events(events1, events2, full=False): |
|---|
| 59 | assert len(events1) == len(events2), (len(events1), len(events2)) |
|---|
| 60 | for event1, event2 in zip(events1, events2): |
|---|
| 61 | assert event1.__class__ == event2.__class__, (event1, event2) |
|---|
| 62 | if isinstance(event1, yaml.AliasEvent) and full: |
|---|
| 63 | assert event1.anchor == event2.anchor, (event1, event2) |
|---|
| 64 | if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): |
|---|
| [328] | 65 | if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full: |
|---|
| [322] | 66 | assert event1.tag == event2.tag, (event1, event2) |
|---|
| 67 | if isinstance(event1, yaml.ScalarEvent): |
|---|
| 68 | assert event1.value == event2.value, (event1, event2) |
|---|
| [44] | 69 | |
|---|
| [322] | 70 | def test_parser(data_filename, canonical_filename, verbose=False): |
|---|
| 71 | events1 = None |
|---|
| 72 | events2 = None |
|---|
| 73 | try: |
|---|
| 74 | events1 = list(yaml.parse(open(data_filename, 'rb'))) |
|---|
| 75 | events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) |
|---|
| 76 | _compare_events(events1, events2) |
|---|
| 77 | finally: |
|---|
| 78 | if verbose: |
|---|
| [328] | 79 | print("EVENTS1:") |
|---|
| [322] | 80 | pprint.pprint(events1) |
|---|
| [328] | 81 | print("EVENTS2:") |
|---|
| [322] | 82 | pprint.pprint(events2) |
|---|
| [44] | 83 | |
|---|
| [322] | 84 | test_parser.unittest = ['.data', '.canonical'] |
|---|
| [44] | 85 | |
|---|
| [322] | 86 | def test_parser_on_canonical(canonical_filename, verbose=False): |
|---|
| 87 | events1 = None |
|---|
| 88 | events2 = None |
|---|
| 89 | try: |
|---|
| 90 | events1 = list(yaml.parse(open(canonical_filename, 'rb'))) |
|---|
| 91 | events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) |
|---|
| 92 | _compare_events(events1, events2, full=True) |
|---|
| 93 | finally: |
|---|
| 94 | if verbose: |
|---|
| [328] | 95 | print("EVENTS1:") |
|---|
| [322] | 96 | pprint.pprint(events1) |
|---|
| [328] | 97 | print("EVENTS2:") |
|---|
| [322] | 98 | pprint.pprint(events2) |
|---|
| [54] | 99 | |
|---|
| [322] | 100 | test_parser_on_canonical.unittest = ['.canonical'] |
|---|
| [54] | 101 | |
|---|
| [322] | 102 | def _compare_nodes(node1, node2): |
|---|
| 103 | assert node1.__class__ == node2.__class__, (node1, node2) |
|---|
| 104 | assert node1.tag == node2.tag, (node1, node2) |
|---|
| 105 | if isinstance(node1, yaml.ScalarNode): |
|---|
| 106 | assert node1.value == node2.value, (node1, node2) |
|---|
| 107 | else: |
|---|
| 108 | assert len(node1.value) == len(node2.value), (node1, node2) |
|---|
| 109 | for item1, item2 in zip(node1.value, node2.value): |
|---|
| 110 | if not isinstance(item1, tuple): |
|---|
| 111 | item1 = (item1,) |
|---|
| 112 | item2 = (item2,) |
|---|
| 113 | for subnode1, subnode2 in zip(item1, item2): |
|---|
| 114 | _compare_nodes(subnode1, subnode2) |
|---|
| [54] | 115 | |
|---|
| [322] | 116 | def test_composer(data_filename, canonical_filename, verbose=False): |
|---|
| 117 | nodes1 = None |
|---|
| 118 | nodes2 = None |
|---|
| 119 | try: |
|---|
| 120 | nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) |
|---|
| 121 | nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) |
|---|
| 122 | assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) |
|---|
| 123 | for node1, node2 in zip(nodes1, nodes2): |
|---|
| 124 | _compare_nodes(node1, node2) |
|---|
| 125 | finally: |
|---|
| 126 | if verbose: |
|---|
| [328] | 127 | print("NODES1:") |
|---|
| [322] | 128 | pprint.pprint(nodes1) |
|---|
| [328] | 129 | print("NODES2:") |
|---|
| [322] | 130 | pprint.pprint(nodes2) |
|---|
| [54] | 131 | |
|---|
| [322] | 132 | test_composer.unittest = ['.data', '.canonical'] |
|---|
| [55] | 133 | |
|---|
| [322] | 134 | def _make_loader(): |
|---|
| 135 | global MyLoader |
|---|
| [55] | 136 | |
|---|
| [322] | 137 | class MyLoader(yaml.Loader): |
|---|
| 138 | def construct_sequence(self, node): |
|---|
| 139 | return tuple(yaml.Loader.construct_sequence(self, node)) |
|---|
| 140 | def construct_mapping(self, node): |
|---|
| 141 | pairs = self.construct_pairs(node) |
|---|
| 142 | pairs.sort() |
|---|
| 143 | return pairs |
|---|
| 144 | def construct_undefined(self, node): |
|---|
| 145 | return self.construct_scalar(node) |
|---|
| [57] | 146 | |
|---|
| [328] | 147 | MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping) |
|---|
| [322] | 148 | MyLoader.add_constructor(None, MyLoader.construct_undefined) |
|---|
| [136] | 149 | |
|---|
| [322] | 150 | def _make_canonical_loader(): |
|---|
| 151 | global MyCanonicalLoader |
|---|
| [146] | 152 | |
|---|
| [322] | 153 | class MyCanonicalLoader(yaml.CanonicalLoader): |
|---|
| 154 | def construct_sequence(self, node): |
|---|
| 155 | return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) |
|---|
| 156 | def construct_mapping(self, node): |
|---|
| 157 | pairs = self.construct_pairs(node) |
|---|
| 158 | pairs.sort() |
|---|
| 159 | return pairs |
|---|
| 160 | def construct_undefined(self, node): |
|---|
| 161 | return self.construct_scalar(node) |
|---|
| [146] | 162 | |
|---|
| [328] | 163 | MyCanonicalLoader.add_constructor('tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) |
|---|
| [322] | 164 | MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) |
|---|
| [146] | 165 | |
|---|
| [322] | 166 | def test_constructor(data_filename, canonical_filename, verbose=False): |
|---|
| 167 | _make_loader() |
|---|
| 168 | _make_canonical_loader() |
|---|
| 169 | native1 = None |
|---|
| 170 | native2 = None |
|---|
| 171 | try: |
|---|
| 172 | native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) |
|---|
| 173 | native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) |
|---|
| 174 | assert native1 == native2, (native1, native2) |
|---|
| 175 | finally: |
|---|
| 176 | if verbose: |
|---|
| [328] | 177 | print("NATIVE1:") |
|---|
| [322] | 178 | pprint.pprint(native1) |
|---|
| [328] | 179 | print("NATIVE2:") |
|---|
| [322] | 180 | pprint.pprint(native2) |
|---|
| [146] | 181 | |
|---|
| [322] | 182 | test_constructor.unittest = ['.data', '.canonical'] |
|---|
| [136] | 183 | |
|---|
| [322] | 184 | if __name__ == '__main__': |
|---|
| 185 | import test_appliance |
|---|
| 186 | test_appliance.run(globals()) |
|---|
| [55] | 187 | |
|---|