source: pyyaml/trunk/tests/test_appliance.py @ 137

Revision 137, 11.9 KB checked in by xi, 9 years ago (diff)

Refactor resolver.

Line 
1
2import unittest, os
3
4from yaml import *
5
6class TestAppliance(unittest.TestCase):
7
8    DATA = 'tests/data'
9
10    all_tests = {}
11    for filename in os.listdir(DATA):
12        if os.path.isfile(os.path.join(DATA, filename)):
13            root, ext = os.path.splitext(filename)
14            all_tests.setdefault(root, []).append(ext)
15
16    def add_tests(cls, method_name, *extensions):
17        for test in cls.all_tests:
18            available_extensions = cls.all_tests[test]
19            for ext in extensions:
20                if ext not in available_extensions:
21                    break
22            else:
23                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
24                def test_method(self, test=test, filenames=filenames):
25                    getattr(self, '_'+method_name)(test, *filenames)
26                test = test.replace('-', '_')
27                try:
28                    test_method.__name__ = '%s_%s' % (method_name, test)
29                except TypeError:
30                    import new
31                    test_method = new.function(test_method.func_code, test_method.func_globals,
32                            '%s_%s' % (method_name, test), test_method.func_defaults,
33                            test_method.func_closure)
34                setattr(cls, test_method.__name__, test_method)
35    add_tests = classmethod(add_tests)
36
37class Error(Exception):
38    pass
39
40class CanonicalScanner:
41
42    def __init__(self, data):
43        self.data = unicode(data, 'utf-8')+u'\0'
44        self.index = 0
45        self.scan()
46
47    def check_token(self, *choices):
48        if self.tokens:
49            if not choices:
50                return True
51            for choice in choices:
52                if isinstance(self.tokens[0], choice):
53                    return True
54        return False
55
56    def peek_token(self):
57        if self.tokens:
58            return self.tokens[0]
59
60    def get_token(self, choice=None):
61        token = self.tokens.pop(0)
62        if choice and not isinstance(token, choice):
63            raise Error("unexpected token "+repr(token))
64        return token
65
66    def get_token_value(self):
67        token = self.get_token()
68        return token.value
69
70    def scan(self):
71        self.tokens = []
72        self.tokens.append(StreamStartToken(None, None))
73        while True:
74            self.find_token()
75            ch = self.data[self.index]
76            if ch == u'\0':
77                self.tokens.append(StreamEndToken(None, None))
78                break
79            elif ch == u'%':
80                self.tokens.append(self.scan_directive())
81            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
82                self.index += 3
83                self.tokens.append(DocumentStartToken(None, None))
84            elif ch == u'[':
85                self.index += 1
86                self.tokens.append(FlowSequenceStartToken(None, None))
87            elif ch == u'{':
88                self.index += 1
89                self.tokens.append(FlowMappingStartToken(None, None))
90            elif ch == u']':
91                self.index += 1
92                self.tokens.append(FlowSequenceEndToken(None, None))
93            elif ch == u'}':
94                self.index += 1
95                self.tokens.append(FlowMappingEndToken(None, None))
96            elif ch == u'?':
97                self.index += 1
98                self.tokens.append(KeyToken(None, None))
99            elif ch == u':':
100                self.index += 1
101                self.tokens.append(ValueToken(None, None))
102            elif ch == u',':
103                self.index += 1
104                self.tokens.append(FlowEntryToken(None, None))
105            elif ch == u'*' or ch == u'&':
106                self.tokens.append(self.scan_alias())
107            elif ch == u'!':
108                self.tokens.append(self.scan_tag())
109            elif ch == u'"':
110                self.tokens.append(self.scan_scalar())
111            else:
112                raise Error("invalid token")
113
114    DIRECTIVE = u'%YAML 1.1'
115
116    def scan_directive(self):
117        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
118                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
119            self.index += len(self.DIRECTIVE)
120            return DirectiveToken('YAML', (1, 1), None, None)
121
122    def scan_alias(self):
123        if self.data[self.index] == u'*':
124            TokenClass = AliasToken
125        else:
126            TokenClass = AnchorToken
127        self.index += 1
128        start = self.index
129        while self.data[self.index] not in u', \n\0':
130            self.index += 1
131        value = self.data[start:self.index]
132        return TokenClass(value, None, None)
133
134    def scan_tag(self):
135        self.index += 1
136        start = self.index
137        while self.data[self.index] not in u' \n\0':
138            self.index += 1
139        value = self.data[start:self.index]
140        if value[0] == u'!':
141            value = 'tag:yaml.org,2002:'+value[1:]
142        elif value[0] == u'<' and value[-1] == u'>':
143            value = value[1:-1]
144        else:
145            value = u'!'+value
146        return TagToken(value, None, None)
147
148    QUOTE_CODES = {
149        'x': 2,
150        'u': 4,
151        'U': 8,
152    }
153
154    QUOTE_REPLACES = {
155        u'\\': u'\\',
156        u'\"': u'\"',
157        u' ': u' ',
158        u'a': u'\x07',
159        u'b': u'\x08',
160        u'e': u'\x1B',
161        u'f': u'\x0C',
162        u'n': u'\x0A',
163        u'r': u'\x0D',
164        u't': u'\x09',
165        u'v': u'\x0B',
166        u'N': u'\u0085',
167        u'L': u'\u2028',
168        u'P': u'\u2029',
169        u'_': u'_',
170        u'0': u'\x00',
171
172    }
173
174    def scan_scalar(self):
175        self.index += 1
176        chunks = []
177        start = self.index
178        ignore_spaces = False
179        while self.data[self.index] != u'"':
180            if self.data[self.index] == u'\\':
181                ignore_spaces = False
182                chunks.append(self.data[start:self.index])
183                self.index += 1
184                ch = self.data[self.index]
185                self.index += 1
186                if ch == u'\n':
187                    ignore_spaces = True
188                elif ch in self.QUOTE_CODES:
189                    length = self.QUOTE_CODES[ch]
190                    code = int(self.data[self.index:self.index+length], 16)
191                    chunks.append(unichr(code))
192                    self.index += length
193                else:
194                    chunks.append(self.QUOTE_REPLACES[ch])
195                start = self.index
196            elif self.data[self.index] == u'\n':
197                chunks.append(self.data[start:self.index])
198                chunks.append(u' ')
199                self.index += 1
200                start = self.index
201                ignore_spaces = True
202            elif ignore_spaces and self.data[self.index] == u' ':
203                self.index += 1
204                start = self.index
205            else:
206                ignore_spaces = False
207                self.index += 1
208        chunks.append(self.data[start:self.index])
209        self.index += 1
210        return ScalarToken(u''.join(chunks), False, None, None)
211
212    def find_token(self):
213        found = False
214        while not found:
215            while self.data[self.index] in u' \t':
216                self.index += 1
217            if self.data[self.index] == u'#':
218                while self.data[self.index] != u'\n':
219                    self.index += 1
220            if self.data[self.index] == u'\n':
221                self.index += 1
222            else:
223                found = True
224
225class CanonicalParser:
226
227    def __init__(self):
228        self.events = []
229        self.parse()
230
231    # stream: STREAM-START document* STREAM-END
232    def parse_stream(self):
233        self.get_token(StreamStartToken)
234        self.events.append(StreamStartEvent(None, None))
235        while not self.check_token(StreamEndToken):
236            if self.check_token(DirectiveToken, DocumentStartToken):
237                self.parse_document()
238            else:
239                raise Error("document is expected, got "+repr(self.tokens[self.index]))
240        self.get_token(StreamEndToken)
241        self.events.append(StreamEndEvent(None, None))
242
243    # document: DIRECTIVE? DOCUMENT-START node
244    def parse_document(self):
245        node = None
246        if self.check_token(DirectiveToken):
247            self.get_token(DirectiveToken)
248        self.get_token(DocumentStartToken)
249        self.events.append(DocumentStartEvent(None, None))
250        self.parse_node()
251        self.events.append(DocumentEndEvent(None, None))
252
253    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
254    def parse_node(self):
255        if self.check_token(AliasToken):
256            self.events.append(AliasEvent(self.get_token_value(), None, None))
257        else:
258            anchor = None
259            if self.check_token(AnchorToken):
260                anchor = self.get_token_value()
261            tag = None
262            if self.check_token(TagToken):
263                tag = self.get_token_value()
264            if self.check_token(ScalarToken):
265                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
266            elif self.check_token(FlowSequenceStartToken):
267                self.events.append(SequenceStartEvent(anchor, tag, None, None))
268                self.parse_sequence()
269            elif self.check_token(FlowMappingStartToken):
270                self.events.append(MappingStartEvent(anchor, tag, None, None))
271                self.parse_mapping()
272            else:
273                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
274
275    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
276    def parse_sequence(self):
277        self.get_token(FlowSequenceStartToken)
278        if not self.check_token(FlowSequenceEndToken):
279            self.parse_node()
280            while not self.check_token(FlowSequenceEndToken):
281                self.get_token(FlowEntryToken)
282                if not self.check_token(FlowSequenceEndToken):
283                    self.parse_node()
284        self.get_token(FlowSequenceEndToken)
285        self.events.append(SequenceEndEvent(None, None))
286
287    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
288    def parse_mapping(self):
289        self.get_token(FlowMappingStartToken)
290        if not self.check_token(FlowMappingEndToken):
291            self.parse_map_entry()
292            while not self.check_token(FlowMappingEndToken):
293                self.get_token(FlowEntryToken)
294                if not self.check_token(FlowMappingEndToken):
295                    self.parse_map_entry()
296        self.get_token(FlowMappingEndToken)
297        self.events.append(MappingEndEvent(None, None))
298
299    # map_entry: KEY node VALUE node
300    def parse_map_entry(self):
301        self.get_token(KeyToken)
302        self.parse_node()
303        self.get_token(ValueToken)
304        self.parse_node()
305
306    def parse(self):
307        self.parse_stream()
308
309    def get_event(self):
310        return self.events.pop(0)
311
312    def check_event(self, *choices):
313        if self.events:
314            if not choices:
315                return True
316            for choice in choices:
317                if isinstance(self.events[0], choice):
318                    return True
319        return False
320
321    def peek_event(self):
322        return self.events[0]
323
324class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
325
326    def __init__(self, stream):
327        if hasattr(stream, 'read'):
328            stream = stream.read()
329        CanonicalScanner.__init__(self, stream)
330        CanonicalParser.__init__(self)
331        Composer.__init__(self)
332        Constructor.__init__(self)
333        Resolver.__init__(self)
334
335def canonical_scan(stream):
336    return scan(stream, Loader=CanonicalLoader)
337
338def canonical_parse(stream):
339    return parse(stream, Loader=CanonicalLoader)
340
341def canonical_compose(stream):
342    return compose(stream, Loader=CanonicalLoader)
343
344def canonical_compose_all(stream):
345    return compose_all(stream, Loader=CanonicalLoader)
346
347def canonical_load(stream):
348    return load(stream, Loader=CanonicalLoader)
349
350def canonical_load_all(stream):
351    return load_all(stream, Loader=CanonicalLoader)
352
Note: See TracBrowser for help on using the repository browser.