source: pyyaml/trunk/tests/test_appliance.py @ 312

Revision 312, 12.1 KB checked in by xi, 5 years ago (diff)

Fixed test errors for LibYAML bindings; added a test on emitting nodes in all possible styles.

RevLine 
[39]1
2import unittest, os
3
[136]4from yaml import *
[146]5from yaml.composer import *
6from yaml.constructor import *
7from yaml.resolver import *
[51]8
[39]9class TestAppliance(unittest.TestCase):
10
11    DATA = 'tests/data'
[312]12    SKIP_EXT = '.skip'
[39]13
[45]14    all_tests = {}
[39]15    for filename in os.listdir(DATA):
16        if os.path.isfile(os.path.join(DATA, filename)):
17            root, ext = os.path.splitext(filename)
[45]18            all_tests.setdefault(root, []).append(ext)
[39]19
20    def add_tests(cls, method_name, *extensions):
[45]21        for test in cls.all_tests:
22            available_extensions = cls.all_tests[test]
[312]23            if cls.SKIP_EXT in available_extensions:
24                continue
[39]25            for ext in extensions:
26                if ext not in available_extensions:
27                    break
28            else:
29                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
30                def test_method(self, test=test, filenames=filenames):
31                    getattr(self, '_'+method_name)(test, *filenames)
[150]32                test = test.replace('-', '_').replace('.', '_')
[45]33                try:
34                    test_method.__name__ = '%s_%s' % (method_name, test)
35                except TypeError:
36                    import new
37                    test_method = new.function(test_method.func_code, test_method.func_globals,
38                            '%s_%s' % (method_name, test), test_method.func_defaults,
39                            test_method.func_closure)
[39]40                setattr(cls, test_method.__name__, test_method)
41    add_tests = classmethod(add_tests)
42
[43]43class Error(Exception):
44    pass
45
46class CanonicalScanner:
47
[51]48    def __init__(self, data):
[43]49        self.data = unicode(data, 'utf-8')+u'\0'
50        self.index = 0
[136]51        self.scan()
[43]52
[136]53    def check_token(self, *choices):
54        if self.tokens:
55            if not choices:
56                return True
57            for choice in choices:
58                if isinstance(self.tokens[0], choice):
59                    return True
60        return False
61
62    def peek_token(self):
63        if self.tokens:
64            return self.tokens[0]
65
66    def get_token(self, choice=None):
67        token = self.tokens.pop(0)
68        if choice and not isinstance(token, choice):
69            raise Error("unexpected token "+repr(token))
70        return token
71
72    def get_token_value(self):
73        token = self.get_token()
74        return token.value
75
[43]76    def scan(self):
[136]77        self.tokens = []
78        self.tokens.append(StreamStartToken(None, None))
[43]79        while True:
80            self.find_token()
81            ch = self.data[self.index]
82            if ch == u'\0':
[136]83                self.tokens.append(StreamEndToken(None, None))
[43]84                break
85            elif ch == u'%':
[136]86                self.tokens.append(self.scan_directive())
[43]87            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
88                self.index += 3
[136]89                self.tokens.append(DocumentStartToken(None, None))
[43]90            elif ch == u'[':
91                self.index += 1
[136]92                self.tokens.append(FlowSequenceStartToken(None, None))
[43]93            elif ch == u'{':
94                self.index += 1
[136]95                self.tokens.append(FlowMappingStartToken(None, None))
[43]96            elif ch == u']':
97                self.index += 1
[136]98                self.tokens.append(FlowSequenceEndToken(None, None))
[43]99            elif ch == u'}':
100                self.index += 1
[136]101                self.tokens.append(FlowMappingEndToken(None, None))
[43]102            elif ch == u'?':
103                self.index += 1
[136]104                self.tokens.append(KeyToken(None, None))
[43]105            elif ch == u':':
106                self.index += 1
[136]107                self.tokens.append(ValueToken(None, None))
[43]108            elif ch == u',':
109                self.index += 1
[136]110                self.tokens.append(FlowEntryToken(None, None))
[43]111            elif ch == u'*' or ch == u'&':
[136]112                self.tokens.append(self.scan_alias())
[43]113            elif ch == u'!':
[136]114                self.tokens.append(self.scan_tag())
[43]115            elif ch == u'"':
[136]116                self.tokens.append(self.scan_scalar())
[43]117            else:
118                raise Error("invalid token")
119
120    DIRECTIVE = u'%YAML 1.1'
121
122    def scan_directive(self):
123        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
124                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
125            self.index += len(self.DIRECTIVE)
[51]126            return DirectiveToken('YAML', (1, 1), None, None)
[43]127
128    def scan_alias(self):
129        if self.data[self.index] == u'*':
130            TokenClass = AliasToken
131        else:
132            TokenClass = AnchorToken
133        self.index += 1
134        start = self.index
135        while self.data[self.index] not in u', \n\0':
136            self.index += 1
137        value = self.data[start:self.index]
[51]138        return TokenClass(value, None, None)
[43]139
140    def scan_tag(self):
141        self.index += 1
142        start = self.index
143        while self.data[self.index] not in u' \n\0':
144            self.index += 1
145        value = self.data[start:self.index]
146        if value[0] == u'!':
147            value = 'tag:yaml.org,2002:'+value[1:]
[51]148        elif value[0] == u'<' and value[-1] == u'>':
[43]149            value = value[1:-1]
[51]150        else:
151            value = u'!'+value
152        return TagToken(value, None, None)
[43]153
154    QUOTE_CODES = {
155        'x': 2,
156        'u': 4,
157        'U': 8,
158    }
159
160    QUOTE_REPLACES = {
161        u'\\': u'\\',
162        u'\"': u'\"',
163        u' ': u' ',
164        u'a': u'\x07',
165        u'b': u'\x08',
166        u'e': u'\x1B',
167        u'f': u'\x0C',
168        u'n': u'\x0A',
169        u'r': u'\x0D',
170        u't': u'\x09',
171        u'v': u'\x0B',
172        u'N': u'\u0085',
173        u'L': u'\u2028',
174        u'P': u'\u2029',
175        u'_': u'_',
176        u'0': u'\x00',
177
178    }
179
180    def scan_scalar(self):
181        self.index += 1
182        chunks = []
183        start = self.index
184        ignore_spaces = False
185        while self.data[self.index] != u'"':
186            if self.data[self.index] == u'\\':
187                ignore_spaces = False
188                chunks.append(self.data[start:self.index])
189                self.index += 1
190                ch = self.data[self.index]
191                self.index += 1
192                if ch == u'\n':
193                    ignore_spaces = True
194                elif ch in self.QUOTE_CODES:
195                    length = self.QUOTE_CODES[ch]
196                    code = int(self.data[self.index:self.index+length], 16)
197                    chunks.append(unichr(code))
198                    self.index += length
199                else:
200                    chunks.append(self.QUOTE_REPLACES[ch])
201                start = self.index
202            elif self.data[self.index] == u'\n':
[48]203                chunks.append(self.data[start:self.index])
[43]204                chunks.append(u' ')
205                self.index += 1
[48]206                start = self.index
[43]207                ignore_spaces = True
208            elif ignore_spaces and self.data[self.index] == u' ':
209                self.index += 1
210                start = self.index
211            else:
212                ignore_spaces = False
213                self.index += 1
214        chunks.append(self.data[start:self.index])
215        self.index += 1
[51]216        return ScalarToken(u''.join(chunks), False, None, None)
[43]217
218    def find_token(self):
219        found = False
220        while not found:
221            while self.data[self.index] in u' \t':
222                self.index += 1
223            if self.data[self.index] == u'#':
224                while self.data[self.index] != u'\n':
225                    self.index += 1
226            if self.data[self.index] == u'\n':
227                self.index += 1
228            else:
229                found = True
230
231class CanonicalParser:
232
[136]233    def __init__(self):
[51]234        self.events = []
[136]235        self.parse()
[43]236
[118]237    # stream: STREAM-START document* STREAM-END
[43]238    def parse_stream(self):
[136]239        self.get_token(StreamStartToken)
[118]240        self.events.append(StreamStartEvent(None, None))
[136]241        while not self.check_token(StreamEndToken):
242            if self.check_token(DirectiveToken, DocumentStartToken):
[51]243                self.parse_document()
[43]244            else:
245                raise Error("document is expected, got "+repr(self.tokens[self.index]))
[136]246        self.get_token(StreamEndToken)
[51]247        self.events.append(StreamEndEvent(None, None))
[43]248
[51]249    # document: DIRECTIVE? DOCUMENT-START node
[43]250    def parse_document(self):
251        node = None
[136]252        if self.check_token(DirectiveToken):
253            self.get_token(DirectiveToken)
254        self.get_token(DocumentStartToken)
[118]255        self.events.append(DocumentStartEvent(None, None))
[51]256        self.parse_node()
[118]257        self.events.append(DocumentEndEvent(None, None))
[43]258
259    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
260    def parse_node(self):
[136]261        if self.check_token(AliasToken):
262            self.events.append(AliasEvent(self.get_token_value(), None, None))
[43]263        else:
264            anchor = None
[136]265            if self.check_token(AnchorToken):
266                anchor = self.get_token_value()
[132]267            tag = None
[136]268            if self.check_token(TagToken):
269                tag = self.get_token_value()
270            if self.check_token(ScalarToken):
[137]271                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
[136]272            elif self.check_token(FlowSequenceStartToken):
[130]273                self.events.append(SequenceStartEvent(anchor, tag, None, None))
[51]274                self.parse_sequence()
[136]275            elif self.check_token(FlowMappingStartToken):
[130]276                self.events.append(MappingStartEvent(anchor, tag, None, None))
[51]277                self.parse_mapping()
[43]278            else:
279                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
280
281    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
282    def parse_sequence(self):
[136]283        self.get_token(FlowSequenceStartToken)
284        if not self.check_token(FlowSequenceEndToken):
[51]285            self.parse_node()
[136]286            while not self.check_token(FlowSequenceEndToken):
287                self.get_token(FlowEntryToken)
288                if not self.check_token(FlowSequenceEndToken):
[51]289                    self.parse_node()
[136]290        self.get_token(FlowSequenceEndToken)
[130]291        self.events.append(SequenceEndEvent(None, None))
[43]292
293    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
294    def parse_mapping(self):
[136]295        self.get_token(FlowMappingStartToken)
296        if not self.check_token(FlowMappingEndToken):
[51]297            self.parse_map_entry()
[136]298            while not self.check_token(FlowMappingEndToken):
299                self.get_token(FlowEntryToken)
300                if not self.check_token(FlowMappingEndToken):
[51]301                    self.parse_map_entry()
[136]302        self.get_token(FlowMappingEndToken)
[130]303        self.events.append(MappingEndEvent(None, None))
[43]304
305    # map_entry: KEY node VALUE node
306    def parse_map_entry(self):
[136]307        self.get_token(KeyToken)
[51]308        self.parse_node()
[136]309        self.get_token(ValueToken)
[51]310        self.parse_node()
[43]311
312    def parse(self):
[51]313        self.parse_stream()
[43]314
[136]315    def get_event(self):
[54]316        return self.events.pop(0)
317
[136]318    def check_event(self, *choices):
319        if self.events:
320            if not choices:
[54]321                return True
[136]322            for choice in choices:
323                if isinstance(self.events[0], choice):
324                    return True
[54]325        return False
326
[136]327    def peek_event(self):
[54]328        return self.events[0]
329
[137]330class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
[136]331
332    def __init__(self, stream):
333        if hasattr(stream, 'read'):
334            stream = stream.read()
335        CanonicalScanner.__init__(self, stream)
336        CanonicalParser.__init__(self)
337        Composer.__init__(self)
338        Constructor.__init__(self)
[137]339        Resolver.__init__(self)
[136]340
341def canonical_scan(stream):
342    return scan(stream, Loader=CanonicalLoader)
343
344def canonical_parse(stream):
345    return parse(stream, Loader=CanonicalLoader)
346
347def canonical_compose(stream):
348    return compose(stream, Loader=CanonicalLoader)
349
350def canonical_compose_all(stream):
351    return compose_all(stream, Loader=CanonicalLoader)
352
353def canonical_load(stream):
354    return load(stream, Loader=CanonicalLoader)
355
356def canonical_load_all(stream):
357    return load_all(stream, Loader=CanonicalLoader)
358
Note: See TracBrowser for help on using the repository browser.