source: branches/pyyaml3000/tests/test_appliance.py @ 48

Revision 48, 11.8 KB checked in by xi, 8 years ago (diff)

Scanner is complete.

RevLine 
[39]1
2import unittest, os
3
4class TestAppliance(unittest.TestCase):
5
6    DATA = 'tests/data'
7
[45]8    all_tests = {}
[39]9    for filename in os.listdir(DATA):
10        if os.path.isfile(os.path.join(DATA, filename)):
11            root, ext = os.path.splitext(filename)
[45]12            all_tests.setdefault(root, []).append(ext)
[39]13
14    def add_tests(cls, method_name, *extensions):
[45]15        for test in cls.all_tests:
16            available_extensions = cls.all_tests[test]
[39]17            for ext in extensions:
18                if ext not in available_extensions:
19                    break
20            else:
21                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
22                def test_method(self, test=test, filenames=filenames):
23                    getattr(self, '_'+method_name)(test, *filenames)
24                test = test.replace('-', '_')
[45]25                try:
26                    test_method.__name__ = '%s_%s' % (method_name, test)
27                except TypeError:
28                    import new
29                    test_method = new.function(test_method.func_code, test_method.func_globals,
30                            '%s_%s' % (method_name, test), test_method.func_defaults,
31                            test_method.func_closure)
[39]32                setattr(cls, test_method.__name__, test_method)
33    add_tests = classmethod(add_tests)
34
[43]35class Node:
36    def __repr__(self):
37        args = []
38        for attribute in ['anchor', 'tag', 'value']:
39            if hasattr(self, attribute):
40                args.append(repr(getattr(self, attribute)))
[44]41        return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
[43]42
43class AliasNode(Node):
44    def __init__(self, anchor):
45        self.anchor = anchor
46
47class ScalarNode(Node):
48    def __init__(self, anchor, tag, value):
49        self.anchor = anchor
50        self.tag = tag
51        self.value = value
52
53class SequenceNode(Node):
54    def __init__(self, anchor, tag, value):
55        self.anchor = anchor
56        self.tag = tag
57        self.value = value
58
59class MappingNode(Node):
60    def __init__(self, anchor, tag, value):
61        self.anchor = anchor
62        self.tag = tag
63        self.value = value
64
65class Token:
66    def __repr__(self):
67        args = []
68        if hasattr(self, 'value'):
69            args.append(repr(self.value))
70        return "%s(%s)" % (self.__class__.__name__, ''.join(args))
71
[47]72class StreamEndToken(Token):
[43]73    pass
74
75class DirectiveToken(Token):
76    pass
77
78class DocumentStartToken(Token):
79    pass
80
81class SequenceStartToken(Token):
82    pass
83
84class MappingStartToken(Token):
85    pass
86
87class SequenceEndToken(Token):
88    pass
89
90class MappingEndToken(Token):
91    pass
92
93class KeyToken(Token):
94    pass
95
96class ValueToken(Token):
97    pass
98
99class EntryToken(Token):
100    pass
101
102class AliasToken(Token):
103    def __init__(self, value):
104        self.value = value
105
106class AnchorToken(Token):
107    def __init__(self, value):
108        self.value = value
109
110class TagToken(Token):
111    def __init__(self, value):
112        self.value = value
113
114class ScalarToken(Token):
115    def __init__(self, value):
116        self.value = value
117
118class Error(Exception):
119    pass
120
121class CanonicalScanner:
122
123    def __init__(self, source, data):
124        self.source = source
125        self.data = unicode(data, 'utf-8')+u'\0'
126        self.index = 0
127
128    def scan(self):
129        #print self.data[self.index:]
130        tokens = []
131        while True:
132            self.find_token()
133            ch = self.data[self.index]
134            if ch == u'\0':
[47]135                tokens.append(StreamEndToken())
[43]136                break
137            elif ch == u'%':
138                tokens.append(self.scan_directive())
139            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
140                self.index += 3
141                tokens.append(DocumentStartToken())
142            elif ch == u'[':
143                self.index += 1
144                tokens.append(SequenceStartToken())
145            elif ch == u'{':
146                self.index += 1
147                tokens.append(MappingStartToken())
148            elif ch == u']':
149                self.index += 1
150                tokens.append(SequenceEndToken())
151            elif ch == u'}':
152                self.index += 1
153                tokens.append(MappingEndToken())
154            elif ch == u'?':
155                self.index += 1
156                tokens.append(KeyToken())
157            elif ch == u':':
158                self.index += 1
159                tokens.append(ValueToken())
160            elif ch == u',':
161                self.index += 1
162                tokens.append(EntryToken())
163            elif ch == u'*' or ch == u'&':
164                tokens.append(self.scan_alias())
165            elif ch == u'!':
166                tokens.append(self.scan_tag())
167            elif ch == u'"':
168                tokens.append(self.scan_scalar())
169            else:
170                raise Error("invalid token")
171        return tokens
172
173    DIRECTIVE = u'%YAML 1.1'
174
175    def scan_directive(self):
176        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
177                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
178            self.index += len(self.DIRECTIVE)
179            return DirectiveToken()
180
181    def scan_alias(self):
182        if self.data[self.index] == u'*':
183            TokenClass = AliasToken
184        else:
185            TokenClass = AnchorToken
186        self.index += 1
187        start = self.index
188        while self.data[self.index] not in u', \n\0':
189            self.index += 1
190        value = self.data[start:self.index]
191        return TokenClass(value)
192
193    def scan_tag(self):
194        self.index += 1
195        start = self.index
196        while self.data[self.index] not in u' \n\0':
197            self.index += 1
198        value = self.data[start:self.index]
199        if value[0] == u'!':
200            value = 'tag:yaml.org,2002:'+value[1:]
201        else:
202            value = value[1:-1]
203        return TagToken(value)
204
205    QUOTE_CODES = {
206        'x': 2,
207        'u': 4,
208        'U': 8,
209    }
210
211    QUOTE_REPLACES = {
212        u'\\': u'\\',
213        u'\"': u'\"',
214        u' ': u' ',
215        u'a': u'\x07',
216        u'b': u'\x08',
217        u'e': u'\x1B',
218        u'f': u'\x0C',
219        u'n': u'\x0A',
220        u'r': u'\x0D',
221        u't': u'\x09',
222        u'v': u'\x0B',
223        u'N': u'\u0085',
224        u'L': u'\u2028',
225        u'P': u'\u2029',
226        u'_': u'_',
227        u'0': u'\x00',
228
229    }
230
231    def scan_scalar(self):
232        self.index += 1
233        chunks = []
234        start = self.index
235        ignore_spaces = False
236        while self.data[self.index] != u'"':
237            if self.data[self.index] == u'\\':
238                ignore_spaces = False
239                chunks.append(self.data[start:self.index])
240                self.index += 1
241                ch = self.data[self.index]
242                self.index += 1
243                if ch == u'\n':
244                    ignore_spaces = True
245                elif ch in self.QUOTE_CODES:
246                    length = self.QUOTE_CODES[ch]
247                    code = int(self.data[self.index:self.index+length], 16)
248                    chunks.append(unichr(code))
249                    self.index += length
250                else:
251                    chunks.append(self.QUOTE_REPLACES[ch])
252                start = self.index
253            elif self.data[self.index] == u'\n':
[48]254                chunks.append(self.data[start:self.index])
[43]255                chunks.append(u' ')
256                self.index += 1
[48]257                start = self.index
[43]258                ignore_spaces = True
259            elif ignore_spaces and self.data[self.index] == u' ':
260                self.index += 1
261                start = self.index
262            else:
263                ignore_spaces = False
264                self.index += 1
265        chunks.append(self.data[start:self.index])
266        self.index += 1
267        return ScalarToken(u''.join(chunks))
268
269    def find_token(self):
270        found = False
271        while not found:
272            while self.data[self.index] in u' \t':
273                self.index += 1
274            if self.data[self.index] == u'#':
275                while self.data[self.index] != u'\n':
276                    self.index += 1
277            if self.data[self.index] == u'\n':
278                self.index += 1
279            else:
280                found = True
281
282class CanonicalParser:
283
284    def __init__(self, source, data):
285        self.scanner = CanonicalScanner(source, data)
286
287    # stream: document* END
288    def parse_stream(self):
289        documents = []
[47]290        while not self.test_token(StreamEndToken):
[43]291            if self.test_token(DirectiveToken, DocumentStartToken):
292                documents.append(self.parse_document())
293            else:
294                raise Error("document is expected, got "+repr(self.tokens[self.index]))
295        return documents
296
297    # document: DIRECTIVE? DOCUMENT-START node?
298    def parse_document(self):
299        node = None
300        if self.test_token(DirectiveToken):
301            self.consume_token(DirectiveToken)
302        self.consume_token(DocumentStartToken)
303        if self.test_token(TagToken, AliasToken, AnchorToken, TagToken,
304                SequenceStartToken, MappingStartToken, ScalarToken):
305            node = self.parse_node()
306        return node
307
308    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
309    def parse_node(self):
310        if self.test_token(AliasToken):
311            return AliasNode(self.get_value())
312        else:
313            anchor = None
314            if self.test_token(AnchorToken):
315                anchor = self.get_value()
316            tag = None
317            if self.test_token(TagToken):
318                tag = self.get_value()
319            if self.test_token(ScalarToken):
320                return ScalarNode(anchor, tag, self.get_value())
321            elif self.test_token(SequenceStartToken):
322                return SequenceNode(anchor, tag, self.parse_sequence())
323            elif self.test_token(MappingStartToken):
324                return MappingNode(anchor, tag, self.parse_mapping())
325            else:
326                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
327
328    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
329    def parse_sequence(self):
330        values = []
331        self.consume_token(SequenceStartToken)
332        if not self.test_token(SequenceEndToken):
333            values.append(self.parse_node())
334            while not self.test_token(SequenceEndToken):
335                self.consume_token(EntryToken)
336                if not self.test_token(SequenceEndToken):
337                    values.append(self.parse_node())
338        self.consume_token(SequenceEndToken)
339        return values
340
341    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
342    def parse_mapping(self):
343        values = []
344        self.consume_token(MappingStartToken)
345        if not self.test_token(MappingEndToken):
346            values.append(self.parse_map_entry())
347            while not self.test_token(MappingEndToken):
348                self.consume_token(EntryToken)
349                if not self.test_token(MappingEndToken):
350                    values.append(self.parse_map_entry())
351        self.consume_token(MappingEndToken)
352        return values
353
354    # map_entry: KEY node VALUE node
355    def parse_map_entry(self):
356        self.consume_token(KeyToken)
357        key = self.parse_node()
358        self.consume_token(ValueToken)
359        value = self.parse_node()
360        return (key, value)
361
362    def test_token(self, *choices):
363        for choice in choices:
364            if isinstance(self.tokens[self.index], choice):
365                return True
366        return False
367
368    def consume_token(self, cls):
369        if not isinstance(self.tokens[self.index], cls):
370            raise Error("unexpected token "+repr(self.tokens[self.index]))
371        self.index += 1
372
373    def get_value(self):
374        value = self.tokens[self.index].value
375        self.index += 1
376        return value
377
378    def parse(self):
379        self.tokens = self.scanner.scan()
380        self.index = 0
381        return self.parse_stream()
382
Note: See TracBrowser for help on using the repository browser.