source: pyyaml/trunk/tests/test_appliance.py @ 136

Revision 136, 11.9 KB checked in by xi, 8 years ago (diff)

Major refactoring.

RevLine 
[39]1
2import unittest, os
3
[136]4from yaml import *
[51]5
[39]6class TestAppliance(unittest.TestCase):
7
8    DATA = 'tests/data'
9
[45]10    all_tests = {}
[39]11    for filename in os.listdir(DATA):
12        if os.path.isfile(os.path.join(DATA, filename)):
13            root, ext = os.path.splitext(filename)
[45]14            all_tests.setdefault(root, []).append(ext)
[39]15
16    def add_tests(cls, method_name, *extensions):
[45]17        for test in cls.all_tests:
18            available_extensions = cls.all_tests[test]
[39]19            for ext in extensions:
20                if ext not in available_extensions:
21                    break
22            else:
23                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
24                def test_method(self, test=test, filenames=filenames):
25                    getattr(self, '_'+method_name)(test, *filenames)
26                test = test.replace('-', '_')
[45]27                try:
28                    test_method.__name__ = '%s_%s' % (method_name, test)
29                except TypeError:
30                    import new
31                    test_method = new.function(test_method.func_code, test_method.func_globals,
32                            '%s_%s' % (method_name, test), test_method.func_defaults,
33                            test_method.func_closure)
[39]34                setattr(cls, test_method.__name__, test_method)
35    add_tests = classmethod(add_tests)
36
[43]37class Error(Exception):
38    pass
39
40class CanonicalScanner:
41
[51]42    def __init__(self, data):
[43]43        self.data = unicode(data, 'utf-8')+u'\0'
44        self.index = 0
[136]45        self.scan()
[43]46
[136]47    def check_token(self, *choices):
48        if self.tokens:
49            if not choices:
50                return True
51            for choice in choices:
52                if isinstance(self.tokens[0], choice):
53                    return True
54        return False
55
56    def peek_token(self):
57        if self.tokens:
58            return self.tokens[0]
59
60    def get_token(self, choice=None):
61        token = self.tokens.pop(0)
62        if choice and not isinstance(token, choice):
63            raise Error("unexpected token "+repr(token))
64        return token
65
66    def get_token_value(self):
67        token = self.get_token()
68        return token.value
69
[43]70    def scan(self):
[136]71        self.tokens = []
72        self.tokens.append(StreamStartToken(None, None))
[43]73        while True:
74            self.find_token()
75            ch = self.data[self.index]
76            if ch == u'\0':
[136]77                self.tokens.append(StreamEndToken(None, None))
[43]78                break
79            elif ch == u'%':
[136]80                self.tokens.append(self.scan_directive())
[43]81            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
82                self.index += 3
[136]83                self.tokens.append(DocumentStartToken(None, None))
[43]84            elif ch == u'[':
85                self.index += 1
[136]86                self.tokens.append(FlowSequenceStartToken(None, None))
[43]87            elif ch == u'{':
88                self.index += 1
[136]89                self.tokens.append(FlowMappingStartToken(None, None))
[43]90            elif ch == u']':
91                self.index += 1
[136]92                self.tokens.append(FlowSequenceEndToken(None, None))
[43]93            elif ch == u'}':
94                self.index += 1
[136]95                self.tokens.append(FlowMappingEndToken(None, None))
[43]96            elif ch == u'?':
97                self.index += 1
[136]98                self.tokens.append(KeyToken(None, None))
[43]99            elif ch == u':':
100                self.index += 1
[136]101                self.tokens.append(ValueToken(None, None))
[43]102            elif ch == u',':
103                self.index += 1
[136]104                self.tokens.append(FlowEntryToken(None, None))
[43]105            elif ch == u'*' or ch == u'&':
[136]106                self.tokens.append(self.scan_alias())
[43]107            elif ch == u'!':
[136]108                self.tokens.append(self.scan_tag())
[43]109            elif ch == u'"':
[136]110                self.tokens.append(self.scan_scalar())
[43]111            else:
112                raise Error("invalid token")
113
114    DIRECTIVE = u'%YAML 1.1'
115
116    def scan_directive(self):
117        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
118                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
119            self.index += len(self.DIRECTIVE)
[51]120            return DirectiveToken('YAML', (1, 1), None, None)
[43]121
122    def scan_alias(self):
123        if self.data[self.index] == u'*':
124            TokenClass = AliasToken
125        else:
126            TokenClass = AnchorToken
127        self.index += 1
128        start = self.index
129        while self.data[self.index] not in u', \n\0':
130            self.index += 1
131        value = self.data[start:self.index]
[51]132        return TokenClass(value, None, None)
[43]133
134    def scan_tag(self):
135        self.index += 1
136        start = self.index
137        while self.data[self.index] not in u' \n\0':
138            self.index += 1
139        value = self.data[start:self.index]
140        if value[0] == u'!':
141            value = 'tag:yaml.org,2002:'+value[1:]
[51]142        elif value[0] == u'<' and value[-1] == u'>':
[43]143            value = value[1:-1]
[51]144        else:
145            value = u'!'+value
146        return TagToken(value, None, None)
[43]147
148    QUOTE_CODES = {
149        'x': 2,
150        'u': 4,
151        'U': 8,
152    }
153
154    QUOTE_REPLACES = {
155        u'\\': u'\\',
156        u'\"': u'\"',
157        u' ': u' ',
158        u'a': u'\x07',
159        u'b': u'\x08',
160        u'e': u'\x1B',
161        u'f': u'\x0C',
162        u'n': u'\x0A',
163        u'r': u'\x0D',
164        u't': u'\x09',
165        u'v': u'\x0B',
166        u'N': u'\u0085',
167        u'L': u'\u2028',
168        u'P': u'\u2029',
169        u'_': u'_',
170        u'0': u'\x00',
171
172    }
173
174    def scan_scalar(self):
175        self.index += 1
176        chunks = []
177        start = self.index
178        ignore_spaces = False
179        while self.data[self.index] != u'"':
180            if self.data[self.index] == u'\\':
181                ignore_spaces = False
182                chunks.append(self.data[start:self.index])
183                self.index += 1
184                ch = self.data[self.index]
185                self.index += 1
186                if ch == u'\n':
187                    ignore_spaces = True
188                elif ch in self.QUOTE_CODES:
189                    length = self.QUOTE_CODES[ch]
190                    code = int(self.data[self.index:self.index+length], 16)
191                    chunks.append(unichr(code))
192                    self.index += length
193                else:
194                    chunks.append(self.QUOTE_REPLACES[ch])
195                start = self.index
196            elif self.data[self.index] == u'\n':
[48]197                chunks.append(self.data[start:self.index])
[43]198                chunks.append(u' ')
199                self.index += 1
[48]200                start = self.index
[43]201                ignore_spaces = True
202            elif ignore_spaces and self.data[self.index] == u' ':
203                self.index += 1
204                start = self.index
205            else:
206                ignore_spaces = False
207                self.index += 1
208        chunks.append(self.data[start:self.index])
209        self.index += 1
[51]210        return ScalarToken(u''.join(chunks), False, None, None)
[43]211
212    def find_token(self):
213        found = False
214        while not found:
215            while self.data[self.index] in u' \t':
216                self.index += 1
217            if self.data[self.index] == u'#':
218                while self.data[self.index] != u'\n':
219                    self.index += 1
220            if self.data[self.index] == u'\n':
221                self.index += 1
222            else:
223                found = True
224
225class CanonicalParser:
226
[136]227    def __init__(self):
[51]228        self.events = []
[136]229        self.parse()
[43]230
[118]231    # stream: STREAM-START document* STREAM-END
[43]232    def parse_stream(self):
[136]233        self.get_token(StreamStartToken)
[118]234        self.events.append(StreamStartEvent(None, None))
[136]235        while not self.check_token(StreamEndToken):
236            if self.check_token(DirectiveToken, DocumentStartToken):
[51]237                self.parse_document()
[43]238            else:
239                raise Error("document is expected, got "+repr(self.tokens[self.index]))
[136]240        self.get_token(StreamEndToken)
[51]241        self.events.append(StreamEndEvent(None, None))
[43]242
[51]243    # document: DIRECTIVE? DOCUMENT-START node
[43]244    def parse_document(self):
245        node = None
[136]246        if self.check_token(DirectiveToken):
247            self.get_token(DirectiveToken)
248        self.get_token(DocumentStartToken)
[118]249        self.events.append(DocumentStartEvent(None, None))
[51]250        self.parse_node()
[118]251        self.events.append(DocumentEndEvent(None, None))
[43]252
253    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
254    def parse_node(self):
[136]255        if self.check_token(AliasToken):
256            self.events.append(AliasEvent(self.get_token_value(), None, None))
[43]257        else:
258            anchor = None
[136]259            if self.check_token(AnchorToken):
260                anchor = self.get_token_value()
[132]261            tag = None
[136]262            if self.check_token(TagToken):
263                tag = self.get_token_value()
264            if self.check_token(ScalarToken):
265                self.events.append(ScalarEvent(anchor, tag, False, self.get_token_value(), None, None))
266            elif self.check_token(FlowSequenceStartToken):
[130]267                self.events.append(SequenceStartEvent(anchor, tag, None, None))
[51]268                self.parse_sequence()
[136]269            elif self.check_token(FlowMappingStartToken):
[130]270                self.events.append(MappingStartEvent(anchor, tag, None, None))
[51]271                self.parse_mapping()
[43]272            else:
273                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
274
275    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
276    def parse_sequence(self):
[136]277        self.get_token(FlowSequenceStartToken)
278        if not self.check_token(FlowSequenceEndToken):
[51]279            self.parse_node()
[136]280            while not self.check_token(FlowSequenceEndToken):
281                self.get_token(FlowEntryToken)
282                if not self.check_token(FlowSequenceEndToken):
[51]283                    self.parse_node()
[136]284        self.get_token(FlowSequenceEndToken)
[130]285        self.events.append(SequenceEndEvent(None, None))
[43]286
287    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
288    def parse_mapping(self):
[136]289        self.get_token(FlowMappingStartToken)
290        if not self.check_token(FlowMappingEndToken):
[51]291            self.parse_map_entry()
[136]292            while not self.check_token(FlowMappingEndToken):
293                self.get_token(FlowEntryToken)
294                if not self.check_token(FlowMappingEndToken):
[51]295                    self.parse_map_entry()
[136]296        self.get_token(FlowMappingEndToken)
[130]297        self.events.append(MappingEndEvent(None, None))
[43]298
299    # map_entry: KEY node VALUE node
300    def parse_map_entry(self):
[136]301        self.get_token(KeyToken)
[51]302        self.parse_node()
[136]303        self.get_token(ValueToken)
[51]304        self.parse_node()
[43]305
306    def parse(self):
[51]307        self.parse_stream()
[43]308
[136]309    def get_event(self):
[54]310        return self.events.pop(0)
311
[136]312    def check_event(self, *choices):
313        if self.events:
314            if not choices:
[54]315                return True
[136]316            for choice in choices:
317                if isinstance(self.events[0], choice):
318                    return True
[54]319        return False
320
[136]321    def peek_event(self):
[54]322        return self.events[0]
323
[136]324class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Detector):
325
326    def __init__(self, stream):
327        if hasattr(stream, 'read'):
328            stream = stream.read()
329        CanonicalScanner.__init__(self, stream)
330        CanonicalParser.__init__(self)
331        Composer.__init__(self)
332        Constructor.__init__(self)
333        Detector.__init__(self)
334
335def canonical_scan(stream):
336    return scan(stream, Loader=CanonicalLoader)
337
338def canonical_parse(stream):
339    return parse(stream, Loader=CanonicalLoader)
340
341def canonical_compose(stream):
342    return compose(stream, Loader=CanonicalLoader)
343
344def canonical_compose_all(stream):
345    return compose_all(stream, Loader=CanonicalLoader)
346
347def canonical_load(stream):
348    return load(stream, Loader=CanonicalLoader)
349
350def canonical_load_all(stream):
351    return load_all(stream, Loader=CanonicalLoader)
352
Note: See TracBrowser for help on using the repository browser.