source: branches/pyyaml3000/tests/test_appliance.py @ 54

Revision 54, 10.6 KB checked in by xi, 8 years ago (diff)

Fix bugs in Resolver and add more tests.

RevLine 
[39]1
2import unittest, os
3
[51]4from yaml.tokens import *
5from yaml.events import *
6
[39]7class TestAppliance(unittest.TestCase):
8
9    DATA = 'tests/data'
10
[45]11    all_tests = {}
[39]12    for filename in os.listdir(DATA):
13        if os.path.isfile(os.path.join(DATA, filename)):
14            root, ext = os.path.splitext(filename)
[45]15            all_tests.setdefault(root, []).append(ext)
[39]16
17    def add_tests(cls, method_name, *extensions):
[45]18        for test in cls.all_tests:
19            available_extensions = cls.all_tests[test]
[39]20            for ext in extensions:
21                if ext not in available_extensions:
22                    break
23            else:
24                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
25                def test_method(self, test=test, filenames=filenames):
26                    getattr(self, '_'+method_name)(test, *filenames)
27                test = test.replace('-', '_')
[45]28                try:
29                    test_method.__name__ = '%s_%s' % (method_name, test)
30                except TypeError:
31                    import new
32                    test_method = new.function(test_method.func_code, test_method.func_globals,
33                            '%s_%s' % (method_name, test), test_method.func_defaults,
34                            test_method.func_closure)
[39]35                setattr(cls, test_method.__name__, test_method)
36    add_tests = classmethod(add_tests)
37
[43]38class Error(Exception):
39    pass
40
41class CanonicalScanner:
42
[51]43    def __init__(self, data):
[43]44        self.data = unicode(data, 'utf-8')+u'\0'
45        self.index = 0
46
47    def scan(self):
48        #print self.data[self.index:]
49        tokens = []
50        while True:
51            self.find_token()
52            ch = self.data[self.index]
53            if ch == u'\0':
[51]54                tokens.append(StreamEndToken(None, None))
[43]55                break
56            elif ch == u'%':
57                tokens.append(self.scan_directive())
58            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
59                self.index += 3
[51]60                tokens.append(DocumentStartToken(None, None))
[43]61            elif ch == u'[':
62                self.index += 1
[51]63                tokens.append(FlowSequenceStartToken(None, None))
[43]64            elif ch == u'{':
65                self.index += 1
[51]66                tokens.append(FlowMappingStartToken(None, None))
[43]67            elif ch == u']':
68                self.index += 1
[51]69                tokens.append(FlowSequenceEndToken(None, None))
[43]70            elif ch == u'}':
71                self.index += 1
[51]72                tokens.append(FlowMappingEndToken(None, None))
[43]73            elif ch == u'?':
74                self.index += 1
[51]75                tokens.append(KeyToken(None, None))
[43]76            elif ch == u':':
77                self.index += 1
[51]78                tokens.append(ValueToken(None, None))
[43]79            elif ch == u',':
80                self.index += 1
[51]81                tokens.append(FlowEntryToken(None, None))
[43]82            elif ch == u'*' or ch == u'&':
83                tokens.append(self.scan_alias())
84            elif ch == u'!':
85                tokens.append(self.scan_tag())
86            elif ch == u'"':
87                tokens.append(self.scan_scalar())
88            else:
89                raise Error("invalid token")
90        return tokens
91
92    DIRECTIVE = u'%YAML 1.1'
93
94    def scan_directive(self):
95        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
96                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
97            self.index += len(self.DIRECTIVE)
[51]98            return DirectiveToken('YAML', (1, 1), None, None)
[43]99
100    def scan_alias(self):
101        if self.data[self.index] == u'*':
102            TokenClass = AliasToken
103        else:
104            TokenClass = AnchorToken
105        self.index += 1
106        start = self.index
107        while self.data[self.index] not in u', \n\0':
108            self.index += 1
109        value = self.data[start:self.index]
[51]110        return TokenClass(value, None, None)
[43]111
112    def scan_tag(self):
113        self.index += 1
114        start = self.index
115        while self.data[self.index] not in u' \n\0':
116            self.index += 1
117        value = self.data[start:self.index]
118        if value[0] == u'!':
119            value = 'tag:yaml.org,2002:'+value[1:]
[51]120        elif value[0] == u'<' and value[-1] == u'>':
[43]121            value = value[1:-1]
[51]122        else:
123            value = u'!'+value
124        return TagToken(value, None, None)
[43]125
126    QUOTE_CODES = {
127        'x': 2,
128        'u': 4,
129        'U': 8,
130    }
131
132    QUOTE_REPLACES = {
133        u'\\': u'\\',
134        u'\"': u'\"',
135        u' ': u' ',
136        u'a': u'\x07',
137        u'b': u'\x08',
138        u'e': u'\x1B',
139        u'f': u'\x0C',
140        u'n': u'\x0A',
141        u'r': u'\x0D',
142        u't': u'\x09',
143        u'v': u'\x0B',
144        u'N': u'\u0085',
145        u'L': u'\u2028',
146        u'P': u'\u2029',
147        u'_': u'_',
148        u'0': u'\x00',
149
150    }
151
152    def scan_scalar(self):
153        self.index += 1
154        chunks = []
155        start = self.index
156        ignore_spaces = False
157        while self.data[self.index] != u'"':
158            if self.data[self.index] == u'\\':
159                ignore_spaces = False
160                chunks.append(self.data[start:self.index])
161                self.index += 1
162                ch = self.data[self.index]
163                self.index += 1
164                if ch == u'\n':
165                    ignore_spaces = True
166                elif ch in self.QUOTE_CODES:
167                    length = self.QUOTE_CODES[ch]
168                    code = int(self.data[self.index:self.index+length], 16)
169                    chunks.append(unichr(code))
170                    self.index += length
171                else:
172                    chunks.append(self.QUOTE_REPLACES[ch])
173                start = self.index
174            elif self.data[self.index] == u'\n':
[48]175                chunks.append(self.data[start:self.index])
[43]176                chunks.append(u' ')
177                self.index += 1
[48]178                start = self.index
[43]179                ignore_spaces = True
180            elif ignore_spaces and self.data[self.index] == u' ':
181                self.index += 1
182                start = self.index
183            else:
184                ignore_spaces = False
185                self.index += 1
186        chunks.append(self.data[start:self.index])
187        self.index += 1
[51]188        return ScalarToken(u''.join(chunks), False, None, None)
[43]189
190    def find_token(self):
191        found = False
192        while not found:
193            while self.data[self.index] in u' \t':
194                self.index += 1
195            if self.data[self.index] == u'#':
196                while self.data[self.index] != u'\n':
197                    self.index += 1
198            if self.data[self.index] == u'\n':
199                self.index += 1
200            else:
201                found = True
202
203class CanonicalParser:
204
[51]205    def __init__(self, data):
206        self.scanner = CanonicalScanner(data)
207        self.events = []
[43]208
209    # stream: document* END
210    def parse_stream(self):
[47]211        while not self.test_token(StreamEndToken):
[43]212            if self.test_token(DirectiveToken, DocumentStartToken):
[51]213                self.parse_document()
[43]214            else:
215                raise Error("document is expected, got "+repr(self.tokens[self.index]))
[51]216        self.events.append(StreamEndEvent(None, None))
[43]217
[51]218    # document: DIRECTIVE? DOCUMENT-START node
[43]219    def parse_document(self):
220        node = None
221        if self.test_token(DirectiveToken):
222            self.consume_token(DirectiveToken)
223        self.consume_token(DocumentStartToken)
[51]224        self.parse_node()
[43]225
226    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
227    def parse_node(self):
228        if self.test_token(AliasToken):
[51]229            self.events.append(AliasEvent(self.get_value(), None, None))
[43]230        else:
231            anchor = None
232            if self.test_token(AnchorToken):
233                anchor = self.get_value()
[51]234            tag = u'!'
[43]235            if self.test_token(TagToken):
236                tag = self.get_value()
237            if self.test_token(ScalarToken):
[51]238                self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
239            elif self.test_token(FlowSequenceStartToken):
240                self.events.append(SequenceEvent(anchor, tag, None, None))
241                self.parse_sequence()
242            elif self.test_token(FlowMappingStartToken):
243                self.events.append(MappingEvent(anchor, tag, None, None))
244                self.parse_mapping()
[43]245            else:
246                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
247
248    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
249    def parse_sequence(self):
[51]250        self.consume_token(FlowSequenceStartToken)
251        if not self.test_token(FlowSequenceEndToken):
252            self.parse_node()
253            while not self.test_token(FlowSequenceEndToken):
254                self.consume_token(FlowEntryToken)
255                if not self.test_token(FlowSequenceEndToken):
256                    self.parse_node()
257        self.consume_token(FlowSequenceEndToken)
258        self.events.append(CollectionEndEvent(None, None))
[43]259
260    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
261    def parse_mapping(self):
[51]262        self.consume_token(FlowMappingStartToken)
263        if not self.test_token(FlowMappingEndToken):
264            self.parse_map_entry()
265            while not self.test_token(FlowMappingEndToken):
266                self.consume_token(FlowEntryToken)
267                if not self.test_token(FlowMappingEndToken):
268                    self.parse_map_entry()
269        self.consume_token(FlowMappingEndToken)
270        self.events.append(CollectionEndEvent(None, None))
[43]271
272    # map_entry: KEY node VALUE node
273    def parse_map_entry(self):
274        self.consume_token(KeyToken)
[51]275        self.parse_node()
[43]276        self.consume_token(ValueToken)
[51]277        self.parse_node()
[43]278
279    def test_token(self, *choices):
280        for choice in choices:
281            if isinstance(self.tokens[self.index], choice):
282                return True
283        return False
284
285    def consume_token(self, cls):
286        if not isinstance(self.tokens[self.index], cls):
287            raise Error("unexpected token "+repr(self.tokens[self.index]))
288        self.index += 1
289
290    def get_value(self):
291        value = self.tokens[self.index].value
292        self.index += 1
293        return value
294
295    def parse(self):
296        self.tokens = self.scanner.scan()
297        self.index = 0
[51]298        self.parse_stream()
299        return self.events
[43]300
[54]301    def get(self):
302        return self.events.pop(0)
303
304    def check(self, *choices):
305        for choice in choices:
306            if isinstance(self.events[0], choice):
307                return True
308        return False
309
310    def peek(self):
311        return self.events[0]
312
Note: See TracBrowser for help on using the repository browser.