source: branches/pyyaml3000/tests/test_appliance.py @ 51

Revision 51, 10.3 KB checked in by xi, 8 years ago (diff)

Parser is done. Add iterator interfaces for Scanner and Parser.

Line 
1
2import unittest, os
3
4from yaml.tokens import *
5from yaml.events import *
6
7class TestAppliance(unittest.TestCase):
8
9    DATA = 'tests/data'
10
11    all_tests = {}
12    for filename in os.listdir(DATA):
13        if os.path.isfile(os.path.join(DATA, filename)):
14            root, ext = os.path.splitext(filename)
15            all_tests.setdefault(root, []).append(ext)
16
17    def add_tests(cls, method_name, *extensions):
18        for test in cls.all_tests:
19            available_extensions = cls.all_tests[test]
20            for ext in extensions:
21                if ext not in available_extensions:
22                    break
23            else:
24                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
25                def test_method(self, test=test, filenames=filenames):
26                    getattr(self, '_'+method_name)(test, *filenames)
27                test = test.replace('-', '_')
28                try:
29                    test_method.__name__ = '%s_%s' % (method_name, test)
30                except TypeError:
31                    import new
32                    test_method = new.function(test_method.func_code, test_method.func_globals,
33                            '%s_%s' % (method_name, test), test_method.func_defaults,
34                            test_method.func_closure)
35                setattr(cls, test_method.__name__, test_method)
36    add_tests = classmethod(add_tests)
37
38class Error(Exception):
39    pass
40
41class CanonicalScanner:
42
43    def __init__(self, data):
44        self.data = unicode(data, 'utf-8')+u'\0'
45        self.index = 0
46
47    def scan(self):
48        #print self.data[self.index:]
49        tokens = []
50        while True:
51            self.find_token()
52            ch = self.data[self.index]
53            if ch == u'\0':
54                tokens.append(StreamEndToken(None, None))
55                break
56            elif ch == u'%':
57                tokens.append(self.scan_directive())
58            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
59                self.index += 3
60                tokens.append(DocumentStartToken(None, None))
61            elif ch == u'[':
62                self.index += 1
63                tokens.append(FlowSequenceStartToken(None, None))
64            elif ch == u'{':
65                self.index += 1
66                tokens.append(FlowMappingStartToken(None, None))
67            elif ch == u']':
68                self.index += 1
69                tokens.append(FlowSequenceEndToken(None, None))
70            elif ch == u'}':
71                self.index += 1
72                tokens.append(FlowMappingEndToken(None, None))
73            elif ch == u'?':
74                self.index += 1
75                tokens.append(KeyToken(None, None))
76            elif ch == u':':
77                self.index += 1
78                tokens.append(ValueToken(None, None))
79            elif ch == u',':
80                self.index += 1
81                tokens.append(FlowEntryToken(None, None))
82            elif ch == u'*' or ch == u'&':
83                tokens.append(self.scan_alias())
84            elif ch == u'!':
85                tokens.append(self.scan_tag())
86            elif ch == u'"':
87                tokens.append(self.scan_scalar())
88            else:
89                raise Error("invalid token")
90        return tokens
91
92    DIRECTIVE = u'%YAML 1.1'
93
94    def scan_directive(self):
95        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
96                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
97            self.index += len(self.DIRECTIVE)
98            return DirectiveToken('YAML', (1, 1), None, None)
99
100    def scan_alias(self):
101        if self.data[self.index] == u'*':
102            TokenClass = AliasToken
103        else:
104            TokenClass = AnchorToken
105        self.index += 1
106        start = self.index
107        while self.data[self.index] not in u', \n\0':
108            self.index += 1
109        value = self.data[start:self.index]
110        return TokenClass(value, None, None)
111
112    def scan_tag(self):
113        self.index += 1
114        start = self.index
115        while self.data[self.index] not in u' \n\0':
116            self.index += 1
117        value = self.data[start:self.index]
118        if value[0] == u'!':
119            value = 'tag:yaml.org,2002:'+value[1:]
120        elif value[0] == u'<' and value[-1] == u'>':
121            value = value[1:-1]
122        else:
123            value = u'!'+value
124        return TagToken(value, None, None)
125
126    QUOTE_CODES = {
127        'x': 2,
128        'u': 4,
129        'U': 8,
130    }
131
132    QUOTE_REPLACES = {
133        u'\\': u'\\',
134        u'\"': u'\"',
135        u' ': u' ',
136        u'a': u'\x07',
137        u'b': u'\x08',
138        u'e': u'\x1B',
139        u'f': u'\x0C',
140        u'n': u'\x0A',
141        u'r': u'\x0D',
142        u't': u'\x09',
143        u'v': u'\x0B',
144        u'N': u'\u0085',
145        u'L': u'\u2028',
146        u'P': u'\u2029',
147        u'_': u'_',
148        u'0': u'\x00',
149
150    }
151
152    def scan_scalar(self):
153        self.index += 1
154        chunks = []
155        start = self.index
156        ignore_spaces = False
157        while self.data[self.index] != u'"':
158            if self.data[self.index] == u'\\':
159                ignore_spaces = False
160                chunks.append(self.data[start:self.index])
161                self.index += 1
162                ch = self.data[self.index]
163                self.index += 1
164                if ch == u'\n':
165                    ignore_spaces = True
166                elif ch in self.QUOTE_CODES:
167                    length = self.QUOTE_CODES[ch]
168                    code = int(self.data[self.index:self.index+length], 16)
169                    chunks.append(unichr(code))
170                    self.index += length
171                else:
172                    chunks.append(self.QUOTE_REPLACES[ch])
173                start = self.index
174            elif self.data[self.index] == u'\n':
175                chunks.append(self.data[start:self.index])
176                chunks.append(u' ')
177                self.index += 1
178                start = self.index
179                ignore_spaces = True
180            elif ignore_spaces and self.data[self.index] == u' ':
181                self.index += 1
182                start = self.index
183            else:
184                ignore_spaces = False
185                self.index += 1
186        chunks.append(self.data[start:self.index])
187        self.index += 1
188        return ScalarToken(u''.join(chunks), False, None, None)
189
190    def find_token(self):
191        found = False
192        while not found:
193            while self.data[self.index] in u' \t':
194                self.index += 1
195            if self.data[self.index] == u'#':
196                while self.data[self.index] != u'\n':
197                    self.index += 1
198            if self.data[self.index] == u'\n':
199                self.index += 1
200            else:
201                found = True
202
203class CanonicalParser:
204
205    def __init__(self, data):
206        self.scanner = CanonicalScanner(data)
207        self.events = []
208
209    # stream: document* END
210    def parse_stream(self):
211        while not self.test_token(StreamEndToken):
212            if self.test_token(DirectiveToken, DocumentStartToken):
213                self.parse_document()
214            else:
215                raise Error("document is expected, got "+repr(self.tokens[self.index]))
216        self.events.append(StreamEndEvent(None, None))
217
218    # document: DIRECTIVE? DOCUMENT-START node
219    def parse_document(self):
220        node = None
221        if self.test_token(DirectiveToken):
222            self.consume_token(DirectiveToken)
223        self.consume_token(DocumentStartToken)
224        self.parse_node()
225
226    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
227    def parse_node(self):
228        if self.test_token(AliasToken):
229            self.events.append(AliasEvent(self.get_value(), None, None))
230        else:
231            anchor = None
232            if self.test_token(AnchorToken):
233                anchor = self.get_value()
234            tag = u'!'
235            if self.test_token(TagToken):
236                tag = self.get_value()
237            if self.test_token(ScalarToken):
238                self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
239            elif self.test_token(FlowSequenceStartToken):
240                self.events.append(SequenceEvent(anchor, tag, None, None))
241                self.parse_sequence()
242            elif self.test_token(FlowMappingStartToken):
243                self.events.append(MappingEvent(anchor, tag, None, None))
244                self.parse_mapping()
245            else:
246                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
247
248    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
249    def parse_sequence(self):
250        self.consume_token(FlowSequenceStartToken)
251        if not self.test_token(FlowSequenceEndToken):
252            self.parse_node()
253            while not self.test_token(FlowSequenceEndToken):
254                self.consume_token(FlowEntryToken)
255                if not self.test_token(FlowSequenceEndToken):
256                    self.parse_node()
257        self.consume_token(FlowSequenceEndToken)
258        self.events.append(CollectionEndEvent(None, None))
259
260    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
261    def parse_mapping(self):
262        self.consume_token(FlowMappingStartToken)
263        if not self.test_token(FlowMappingEndToken):
264            self.parse_map_entry()
265            while not self.test_token(FlowMappingEndToken):
266                self.consume_token(FlowEntryToken)
267                if not self.test_token(FlowMappingEndToken):
268                    self.parse_map_entry()
269        self.consume_token(FlowMappingEndToken)
270        self.events.append(CollectionEndEvent(None, None))
271
272    # map_entry: KEY node VALUE node
273    def parse_map_entry(self):
274        self.consume_token(KeyToken)
275        self.parse_node()
276        self.consume_token(ValueToken)
277        self.parse_node()
278
279    def test_token(self, *choices):
280        for choice in choices:
281            if isinstance(self.tokens[self.index], choice):
282                return True
283        return False
284
285    def consume_token(self, cls):
286        if not isinstance(self.tokens[self.index], cls):
287            raise Error("unexpected token "+repr(self.tokens[self.index]))
288        self.index += 1
289
290    def get_value(self):
291        value = self.tokens[self.index].value
292        self.index += 1
293        return value
294
295    def parse(self):
296        self.tokens = self.scanner.scan()
297        self.index = 0
298        self.parse_stream()
299        return self.events
300
Note: See TracBrowser for help on using the repository browser.