source: pyyaml/trunk/tests/test_appliance.py @ 312

Revision 312, 12.1 KB checked in by xi, 5 years ago (diff)

Fixed test errors for LibYAML bindings; added a test on emitting nodes in all possible styles.

Line 
1
2import unittest, os
3
4from yaml import *
5from yaml.composer import *
6from yaml.constructor import *
7from yaml.resolver import *
8
9class TestAppliance(unittest.TestCase):
10
11    DATA = 'tests/data'
12    SKIP_EXT = '.skip'
13
14    all_tests = {}
15    for filename in os.listdir(DATA):
16        if os.path.isfile(os.path.join(DATA, filename)):
17            root, ext = os.path.splitext(filename)
18            all_tests.setdefault(root, []).append(ext)
19
20    def add_tests(cls, method_name, *extensions):
21        for test in cls.all_tests:
22            available_extensions = cls.all_tests[test]
23            if cls.SKIP_EXT in available_extensions:
24                continue
25            for ext in extensions:
26                if ext not in available_extensions:
27                    break
28            else:
29                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
30                def test_method(self, test=test, filenames=filenames):
31                    getattr(self, '_'+method_name)(test, *filenames)
32                test = test.replace('-', '_').replace('.', '_')
33                try:
34                    test_method.__name__ = '%s_%s' % (method_name, test)
35                except TypeError:
36                    import new
37                    test_method = new.function(test_method.func_code, test_method.func_globals,
38                            '%s_%s' % (method_name, test), test_method.func_defaults,
39                            test_method.func_closure)
40                setattr(cls, test_method.__name__, test_method)
41    add_tests = classmethod(add_tests)
42
43class Error(Exception):
44    pass
45
46class CanonicalScanner:
47
48    def __init__(self, data):
49        self.data = unicode(data, 'utf-8')+u'\0'
50        self.index = 0
51        self.scan()
52
53    def check_token(self, *choices):
54        if self.tokens:
55            if not choices:
56                return True
57            for choice in choices:
58                if isinstance(self.tokens[0], choice):
59                    return True
60        return False
61
62    def peek_token(self):
63        if self.tokens:
64            return self.tokens[0]
65
66    def get_token(self, choice=None):
67        token = self.tokens.pop(0)
68        if choice and not isinstance(token, choice):
69            raise Error("unexpected token "+repr(token))
70        return token
71
72    def get_token_value(self):
73        token = self.get_token()
74        return token.value
75
76    def scan(self):
77        self.tokens = []
78        self.tokens.append(StreamStartToken(None, None))
79        while True:
80            self.find_token()
81            ch = self.data[self.index]
82            if ch == u'\0':
83                self.tokens.append(StreamEndToken(None, None))
84                break
85            elif ch == u'%':
86                self.tokens.append(self.scan_directive())
87            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
88                self.index += 3
89                self.tokens.append(DocumentStartToken(None, None))
90            elif ch == u'[':
91                self.index += 1
92                self.tokens.append(FlowSequenceStartToken(None, None))
93            elif ch == u'{':
94                self.index += 1
95                self.tokens.append(FlowMappingStartToken(None, None))
96            elif ch == u']':
97                self.index += 1
98                self.tokens.append(FlowSequenceEndToken(None, None))
99            elif ch == u'}':
100                self.index += 1
101                self.tokens.append(FlowMappingEndToken(None, None))
102            elif ch == u'?':
103                self.index += 1
104                self.tokens.append(KeyToken(None, None))
105            elif ch == u':':
106                self.index += 1
107                self.tokens.append(ValueToken(None, None))
108            elif ch == u',':
109                self.index += 1
110                self.tokens.append(FlowEntryToken(None, None))
111            elif ch == u'*' or ch == u'&':
112                self.tokens.append(self.scan_alias())
113            elif ch == u'!':
114                self.tokens.append(self.scan_tag())
115            elif ch == u'"':
116                self.tokens.append(self.scan_scalar())
117            else:
118                raise Error("invalid token")
119
120    DIRECTIVE = u'%YAML 1.1'
121
122    def scan_directive(self):
123        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
124                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
125            self.index += len(self.DIRECTIVE)
126            return DirectiveToken('YAML', (1, 1), None, None)
127
128    def scan_alias(self):
129        if self.data[self.index] == u'*':
130            TokenClass = AliasToken
131        else:
132            TokenClass = AnchorToken
133        self.index += 1
134        start = self.index
135        while self.data[self.index] not in u', \n\0':
136            self.index += 1
137        value = self.data[start:self.index]
138        return TokenClass(value, None, None)
139
140    def scan_tag(self):
141        self.index += 1
142        start = self.index
143        while self.data[self.index] not in u' \n\0':
144            self.index += 1
145        value = self.data[start:self.index]
146        if value[0] == u'!':
147            value = 'tag:yaml.org,2002:'+value[1:]
148        elif value[0] == u'<' and value[-1] == u'>':
149            value = value[1:-1]
150        else:
151            value = u'!'+value
152        return TagToken(value, None, None)
153
154    QUOTE_CODES = {
155        'x': 2,
156        'u': 4,
157        'U': 8,
158    }
159
160    QUOTE_REPLACES = {
161        u'\\': u'\\',
162        u'\"': u'\"',
163        u' ': u' ',
164        u'a': u'\x07',
165        u'b': u'\x08',
166        u'e': u'\x1B',
167        u'f': u'\x0C',
168        u'n': u'\x0A',
169        u'r': u'\x0D',
170        u't': u'\x09',
171        u'v': u'\x0B',
172        u'N': u'\u0085',
173        u'L': u'\u2028',
174        u'P': u'\u2029',
175        u'_': u'_',
176        u'0': u'\x00',
177
178    }
179
180    def scan_scalar(self):
181        self.index += 1
182        chunks = []
183        start = self.index
184        ignore_spaces = False
185        while self.data[self.index] != u'"':
186            if self.data[self.index] == u'\\':
187                ignore_spaces = False
188                chunks.append(self.data[start:self.index])
189                self.index += 1
190                ch = self.data[self.index]
191                self.index += 1
192                if ch == u'\n':
193                    ignore_spaces = True
194                elif ch in self.QUOTE_CODES:
195                    length = self.QUOTE_CODES[ch]
196                    code = int(self.data[self.index:self.index+length], 16)
197                    chunks.append(unichr(code))
198                    self.index += length
199                else:
200                    chunks.append(self.QUOTE_REPLACES[ch])
201                start = self.index
202            elif self.data[self.index] == u'\n':
203                chunks.append(self.data[start:self.index])
204                chunks.append(u' ')
205                self.index += 1
206                start = self.index
207                ignore_spaces = True
208            elif ignore_spaces and self.data[self.index] == u' ':
209                self.index += 1
210                start = self.index
211            else:
212                ignore_spaces = False
213                self.index += 1
214        chunks.append(self.data[start:self.index])
215        self.index += 1
216        return ScalarToken(u''.join(chunks), False, None, None)
217
218    def find_token(self):
219        found = False
220        while not found:
221            while self.data[self.index] in u' \t':
222                self.index += 1
223            if self.data[self.index] == u'#':
224                while self.data[self.index] != u'\n':
225                    self.index += 1
226            if self.data[self.index] == u'\n':
227                self.index += 1
228            else:
229                found = True
230
231class CanonicalParser:
232
233    def __init__(self):
234        self.events = []
235        self.parse()
236
237    # stream: STREAM-START document* STREAM-END
238    def parse_stream(self):
239        self.get_token(StreamStartToken)
240        self.events.append(StreamStartEvent(None, None))
241        while not self.check_token(StreamEndToken):
242            if self.check_token(DirectiveToken, DocumentStartToken):
243                self.parse_document()
244            else:
245                raise Error("document is expected, got "+repr(self.tokens[self.index]))
246        self.get_token(StreamEndToken)
247        self.events.append(StreamEndEvent(None, None))
248
249    # document: DIRECTIVE? DOCUMENT-START node
250    def parse_document(self):
251        node = None
252        if self.check_token(DirectiveToken):
253            self.get_token(DirectiveToken)
254        self.get_token(DocumentStartToken)
255        self.events.append(DocumentStartEvent(None, None))
256        self.parse_node()
257        self.events.append(DocumentEndEvent(None, None))
258
259    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
260    def parse_node(self):
261        if self.check_token(AliasToken):
262            self.events.append(AliasEvent(self.get_token_value(), None, None))
263        else:
264            anchor = None
265            if self.check_token(AnchorToken):
266                anchor = self.get_token_value()
267            tag = None
268            if self.check_token(TagToken):
269                tag = self.get_token_value()
270            if self.check_token(ScalarToken):
271                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
272            elif self.check_token(FlowSequenceStartToken):
273                self.events.append(SequenceStartEvent(anchor, tag, None, None))
274                self.parse_sequence()
275            elif self.check_token(FlowMappingStartToken):
276                self.events.append(MappingStartEvent(anchor, tag, None, None))
277                self.parse_mapping()
278            else:
279                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
280
281    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
282    def parse_sequence(self):
283        self.get_token(FlowSequenceStartToken)
284        if not self.check_token(FlowSequenceEndToken):
285            self.parse_node()
286            while not self.check_token(FlowSequenceEndToken):
287                self.get_token(FlowEntryToken)
288                if not self.check_token(FlowSequenceEndToken):
289                    self.parse_node()
290        self.get_token(FlowSequenceEndToken)
291        self.events.append(SequenceEndEvent(None, None))
292
293    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
294    def parse_mapping(self):
295        self.get_token(FlowMappingStartToken)
296        if not self.check_token(FlowMappingEndToken):
297            self.parse_map_entry()
298            while not self.check_token(FlowMappingEndToken):
299                self.get_token(FlowEntryToken)
300                if not self.check_token(FlowMappingEndToken):
301                    self.parse_map_entry()
302        self.get_token(FlowMappingEndToken)
303        self.events.append(MappingEndEvent(None, None))
304
305    # map_entry: KEY node VALUE node
306    def parse_map_entry(self):
307        self.get_token(KeyToken)
308        self.parse_node()
309        self.get_token(ValueToken)
310        self.parse_node()
311
312    def parse(self):
313        self.parse_stream()
314
315    def get_event(self):
316        return self.events.pop(0)
317
318    def check_event(self, *choices):
319        if self.events:
320            if not choices:
321                return True
322            for choice in choices:
323                if isinstance(self.events[0], choice):
324                    return True
325        return False
326
327    def peek_event(self):
328        return self.events[0]
329
330class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
331
332    def __init__(self, stream):
333        if hasattr(stream, 'read'):
334            stream = stream.read()
335        CanonicalScanner.__init__(self, stream)
336        CanonicalParser.__init__(self)
337        Composer.__init__(self)
338        Constructor.__init__(self)
339        Resolver.__init__(self)
340
341def canonical_scan(stream):
342    return scan(stream, Loader=CanonicalLoader)
343
344def canonical_parse(stream):
345    return parse(stream, Loader=CanonicalLoader)
346
347def canonical_compose(stream):
348    return compose(stream, Loader=CanonicalLoader)
349
350def canonical_compose_all(stream):
351    return compose_all(stream, Loader=CanonicalLoader)
352
353def canonical_load(stream):
354    return load(stream, Loader=CanonicalLoader)
355
356def canonical_load_all(stream):
357    return load_all(stream, Loader=CanonicalLoader)
358
Note: See TracBrowser for help on using the repository browser.