| 1 | |
|---|
| 2 | import yaml, canonical |
|---|
| 3 | import pprint |
|---|
| 4 | |
|---|
| 5 | def _convert_structure(loader): |
|---|
| 6 | if loader.check_event(yaml.ScalarEvent): |
|---|
| 7 | event = loader.get_event() |
|---|
| 8 | if event.tag or event.anchor or event.value: |
|---|
| 9 | return True |
|---|
| 10 | else: |
|---|
| 11 | return None |
|---|
| 12 | elif loader.check_event(yaml.SequenceStartEvent): |
|---|
| 13 | loader.get_event() |
|---|
| 14 | sequence = [] |
|---|
| 15 | while not loader.check_event(yaml.SequenceEndEvent): |
|---|
| 16 | sequence.append(_convert_structure(loader)) |
|---|
| 17 | loader.get_event() |
|---|
| 18 | return sequence |
|---|
| 19 | elif loader.check_event(yaml.MappingStartEvent): |
|---|
| 20 | loader.get_event() |
|---|
| 21 | mapping = [] |
|---|
| 22 | while not loader.check_event(yaml.MappingEndEvent): |
|---|
| 23 | key = _convert_structure(loader) |
|---|
| 24 | value = _convert_structure(loader) |
|---|
| 25 | mapping.append((key, value)) |
|---|
| 26 | loader.get_event() |
|---|
| 27 | return mapping |
|---|
| 28 | elif loader.check_event(yaml.AliasEvent): |
|---|
| 29 | loader.get_event() |
|---|
| 30 | return '*' |
|---|
| 31 | else: |
|---|
| 32 | loader.get_event() |
|---|
| 33 | return '?' |
|---|
| 34 | |
|---|
| 35 | def test_structure(data_filename, structure_filename, verbose=False): |
|---|
| 36 | nodes1 = [] |
|---|
| 37 | nodes2 = eval(open(structure_filename, 'r').read()) |
|---|
| 38 | try: |
|---|
| 39 | loader = yaml.Loader(open(data_filename, 'rb')) |
|---|
| 40 | while loader.check_event(): |
|---|
| 41 | if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, |
|---|
| 42 | yaml.DocumentStartEvent, yaml.DocumentEndEvent): |
|---|
| 43 | loader.get_event() |
|---|
| 44 | continue |
|---|
| 45 | nodes1.append(_convert_structure(loader)) |
|---|
| 46 | if len(nodes1) == 1: |
|---|
| 47 | nodes1 = nodes1[0] |
|---|
| 48 | assert nodes1 == nodes2, (nodes1, nodes2) |
|---|
| 49 | finally: |
|---|
| 50 | if verbose: |
|---|
| 51 | print("NODES1:") |
|---|
| 52 | pprint.pprint(nodes1) |
|---|
| 53 | print("NODES2:") |
|---|
| 54 | pprint.pprint(nodes2) |
|---|
| 55 | |
|---|
| 56 | test_structure.unittest = ['.data', '.structure'] |
|---|
| 57 | |
|---|
| 58 | def _compare_events(events1, events2, full=False): |
|---|
| 59 | assert len(events1) == len(events2), (len(events1), len(events2)) |
|---|
| 60 | for event1, event2 in zip(events1, events2): |
|---|
| 61 | assert event1.__class__ == event2.__class__, (event1, event2) |
|---|
| 62 | if isinstance(event1, yaml.AliasEvent) and full: |
|---|
| 63 | assert event1.anchor == event2.anchor, (event1, event2) |
|---|
| 64 | if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): |
|---|
| 65 | if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full: |
|---|
| 66 | assert event1.tag == event2.tag, (event1, event2) |
|---|
| 67 | if isinstance(event1, yaml.ScalarEvent): |
|---|
| 68 | assert event1.value == event2.value, (event1, event2) |
|---|
| 69 | |
|---|
| 70 | def test_parser(data_filename, canonical_filename, verbose=False): |
|---|
| 71 | events1 = None |
|---|
| 72 | events2 = None |
|---|
| 73 | try: |
|---|
| 74 | events1 = list(yaml.parse(open(data_filename, 'rb'))) |
|---|
| 75 | events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) |
|---|
| 76 | _compare_events(events1, events2) |
|---|
| 77 | finally: |
|---|
| 78 | if verbose: |
|---|
| 79 | print("EVENTS1:") |
|---|
| 80 | pprint.pprint(events1) |
|---|
| 81 | print("EVENTS2:") |
|---|
| 82 | pprint.pprint(events2) |
|---|
| 83 | |
|---|
| 84 | test_parser.unittest = ['.data', '.canonical'] |
|---|
| 85 | |
|---|
| 86 | def test_parser_on_canonical(canonical_filename, verbose=False): |
|---|
| 87 | events1 = None |
|---|
| 88 | events2 = None |
|---|
| 89 | try: |
|---|
| 90 | events1 = list(yaml.parse(open(canonical_filename, 'rb'))) |
|---|
| 91 | events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) |
|---|
| 92 | _compare_events(events1, events2, full=True) |
|---|
| 93 | finally: |
|---|
| 94 | if verbose: |
|---|
| 95 | print("EVENTS1:") |
|---|
| 96 | pprint.pprint(events1) |
|---|
| 97 | print("EVENTS2:") |
|---|
| 98 | pprint.pprint(events2) |
|---|
| 99 | |
|---|
| 100 | test_parser_on_canonical.unittest = ['.canonical'] |
|---|
| 101 | |
|---|
| 102 | def _compare_nodes(node1, node2): |
|---|
| 103 | assert node1.__class__ == node2.__class__, (node1, node2) |
|---|
| 104 | assert node1.tag == node2.tag, (node1, node2) |
|---|
| 105 | if isinstance(node1, yaml.ScalarNode): |
|---|
| 106 | assert node1.value == node2.value, (node1, node2) |
|---|
| 107 | else: |
|---|
| 108 | assert len(node1.value) == len(node2.value), (node1, node2) |
|---|
| 109 | for item1, item2 in zip(node1.value, node2.value): |
|---|
| 110 | if not isinstance(item1, tuple): |
|---|
| 111 | item1 = (item1,) |
|---|
| 112 | item2 = (item2,) |
|---|
| 113 | for subnode1, subnode2 in zip(item1, item2): |
|---|
| 114 | _compare_nodes(subnode1, subnode2) |
|---|
| 115 | |
|---|
| 116 | def test_composer(data_filename, canonical_filename, verbose=False): |
|---|
| 117 | nodes1 = None |
|---|
| 118 | nodes2 = None |
|---|
| 119 | try: |
|---|
| 120 | nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) |
|---|
| 121 | nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) |
|---|
| 122 | assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) |
|---|
| 123 | for node1, node2 in zip(nodes1, nodes2): |
|---|
| 124 | _compare_nodes(node1, node2) |
|---|
| 125 | finally: |
|---|
| 126 | if verbose: |
|---|
| 127 | print("NODES1:") |
|---|
| 128 | pprint.pprint(nodes1) |
|---|
| 129 | print("NODES2:") |
|---|
| 130 | pprint.pprint(nodes2) |
|---|
| 131 | |
|---|
| 132 | test_composer.unittest = ['.data', '.canonical'] |
|---|
| 133 | |
|---|
| 134 | def _make_loader(): |
|---|
| 135 | global MyLoader |
|---|
| 136 | |
|---|
| 137 | class MyLoader(yaml.Loader): |
|---|
| 138 | def construct_sequence(self, node): |
|---|
| 139 | return tuple(yaml.Loader.construct_sequence(self, node)) |
|---|
| 140 | def construct_mapping(self, node): |
|---|
| 141 | pairs = self.construct_pairs(node) |
|---|
| 142 | pairs.sort(key=(lambda i: str(i))) |
|---|
| 143 | return pairs |
|---|
| 144 | def construct_undefined(self, node): |
|---|
| 145 | return self.construct_scalar(node) |
|---|
| 146 | |
|---|
| 147 | MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping) |
|---|
| 148 | MyLoader.add_constructor(None, MyLoader.construct_undefined) |
|---|
| 149 | |
|---|
| 150 | def _make_canonical_loader(): |
|---|
| 151 | global MyCanonicalLoader |
|---|
| 152 | |
|---|
| 153 | class MyCanonicalLoader(yaml.CanonicalLoader): |
|---|
| 154 | def construct_sequence(self, node): |
|---|
| 155 | return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) |
|---|
| 156 | def construct_mapping(self, node): |
|---|
| 157 | pairs = self.construct_pairs(node) |
|---|
| 158 | pairs.sort(key=(lambda i: str(i))) |
|---|
| 159 | return pairs |
|---|
| 160 | def construct_undefined(self, node): |
|---|
| 161 | return self.construct_scalar(node) |
|---|
| 162 | |
|---|
| 163 | MyCanonicalLoader.add_constructor('tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) |
|---|
| 164 | MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) |
|---|
| 165 | |
|---|
| 166 | def test_constructor(data_filename, canonical_filename, verbose=False): |
|---|
| 167 | _make_loader() |
|---|
| 168 | _make_canonical_loader() |
|---|
| 169 | native1 = None |
|---|
| 170 | native2 = None |
|---|
| 171 | try: |
|---|
| 172 | native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) |
|---|
| 173 | native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) |
|---|
| 174 | assert native1 == native2, (native1, native2) |
|---|
| 175 | finally: |
|---|
| 176 | if verbose: |
|---|
| 177 | print("NATIVE1:") |
|---|
| 178 | pprint.pprint(native1) |
|---|
| 179 | print("NATIVE2:") |
|---|
| 180 | pprint.pprint(native2) |
|---|
| 181 | |
|---|
| 182 | test_constructor.unittest = ['.data', '.canonical'] |
|---|
| 183 | |
|---|
| 184 | if __name__ == '__main__': |
|---|
| 185 | import test_appliance |
|---|
| 186 | test_appliance.run(globals()) |
|---|
| 187 | |
|---|