source: pyyaml/trunk/tests/test_appliance.py @ 150

Revision 150, 12.0 KB checked in by xi, 9 years ago (diff)

Prepare setup.py for release. Fix #7.

RevLine 
[39]1
2import unittest, os
3
[136]4from yaml import *
[146]5from yaml.composer import *
6from yaml.constructor import *
7from yaml.resolver import *
[51]8
[39]9class TestAppliance(unittest.TestCase):
10
11    DATA = 'tests/data'
12
[45]13    all_tests = {}
[39]14    for filename in os.listdir(DATA):
15        if os.path.isfile(os.path.join(DATA, filename)):
16            root, ext = os.path.splitext(filename)
[45]17            all_tests.setdefault(root, []).append(ext)
[39]18
19    def add_tests(cls, method_name, *extensions):
[45]20        for test in cls.all_tests:
21            available_extensions = cls.all_tests[test]
[39]22            for ext in extensions:
23                if ext not in available_extensions:
24                    break
25            else:
26                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
27                def test_method(self, test=test, filenames=filenames):
28                    getattr(self, '_'+method_name)(test, *filenames)
[150]29                test = test.replace('-', '_').replace('.', '_')
[45]30                try:
31                    test_method.__name__ = '%s_%s' % (method_name, test)
32                except TypeError:
33                    import new
34                    test_method = new.function(test_method.func_code, test_method.func_globals,
35                            '%s_%s' % (method_name, test), test_method.func_defaults,
36                            test_method.func_closure)
[39]37                setattr(cls, test_method.__name__, test_method)
38    add_tests = classmethod(add_tests)
39
[43]40class Error(Exception):
41    pass
42
43class CanonicalScanner:
44
[51]45    def __init__(self, data):
[43]46        self.data = unicode(data, 'utf-8')+u'\0'
47        self.index = 0
[136]48        self.scan()
[43]49
[136]50    def check_token(self, *choices):
51        if self.tokens:
52            if not choices:
53                return True
54            for choice in choices:
55                if isinstance(self.tokens[0], choice):
56                    return True
57        return False
58
59    def peek_token(self):
60        if self.tokens:
61            return self.tokens[0]
62
63    def get_token(self, choice=None):
64        token = self.tokens.pop(0)
65        if choice and not isinstance(token, choice):
66            raise Error("unexpected token "+repr(token))
67        return token
68
69    def get_token_value(self):
70        token = self.get_token()
71        return token.value
72
[43]73    def scan(self):
[136]74        self.tokens = []
75        self.tokens.append(StreamStartToken(None, None))
[43]76        while True:
77            self.find_token()
78            ch = self.data[self.index]
79            if ch == u'\0':
[136]80                self.tokens.append(StreamEndToken(None, None))
[43]81                break
82            elif ch == u'%':
[136]83                self.tokens.append(self.scan_directive())
[43]84            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
85                self.index += 3
[136]86                self.tokens.append(DocumentStartToken(None, None))
[43]87            elif ch == u'[':
88                self.index += 1
[136]89                self.tokens.append(FlowSequenceStartToken(None, None))
[43]90            elif ch == u'{':
91                self.index += 1
[136]92                self.tokens.append(FlowMappingStartToken(None, None))
[43]93            elif ch == u']':
94                self.index += 1
[136]95                self.tokens.append(FlowSequenceEndToken(None, None))
[43]96            elif ch == u'}':
97                self.index += 1
[136]98                self.tokens.append(FlowMappingEndToken(None, None))
[43]99            elif ch == u'?':
100                self.index += 1
[136]101                self.tokens.append(KeyToken(None, None))
[43]102            elif ch == u':':
103                self.index += 1
[136]104                self.tokens.append(ValueToken(None, None))
[43]105            elif ch == u',':
106                self.index += 1
[136]107                self.tokens.append(FlowEntryToken(None, None))
[43]108            elif ch == u'*' or ch == u'&':
[136]109                self.tokens.append(self.scan_alias())
[43]110            elif ch == u'!':
[136]111                self.tokens.append(self.scan_tag())
[43]112            elif ch == u'"':
[136]113                self.tokens.append(self.scan_scalar())
[43]114            else:
115                raise Error("invalid token")
116
117    DIRECTIVE = u'%YAML 1.1'
118
119    def scan_directive(self):
120        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
121                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
122            self.index += len(self.DIRECTIVE)
[51]123            return DirectiveToken('YAML', (1, 1), None, None)
[43]124
125    def scan_alias(self):
126        if self.data[self.index] == u'*':
127            TokenClass = AliasToken
128        else:
129            TokenClass = AnchorToken
130        self.index += 1
131        start = self.index
132        while self.data[self.index] not in u', \n\0':
133            self.index += 1
134        value = self.data[start:self.index]
[51]135        return TokenClass(value, None, None)
[43]136
137    def scan_tag(self):
138        self.index += 1
139        start = self.index
140        while self.data[self.index] not in u' \n\0':
141            self.index += 1
142        value = self.data[start:self.index]
143        if value[0] == u'!':
144            value = 'tag:yaml.org,2002:'+value[1:]
[51]145        elif value[0] == u'<' and value[-1] == u'>':
[43]146            value = value[1:-1]
[51]147        else:
148            value = u'!'+value
149        return TagToken(value, None, None)
[43]150
151    QUOTE_CODES = {
152        'x': 2,
153        'u': 4,
154        'U': 8,
155    }
156
157    QUOTE_REPLACES = {
158        u'\\': u'\\',
159        u'\"': u'\"',
160        u' ': u' ',
161        u'a': u'\x07',
162        u'b': u'\x08',
163        u'e': u'\x1B',
164        u'f': u'\x0C',
165        u'n': u'\x0A',
166        u'r': u'\x0D',
167        u't': u'\x09',
168        u'v': u'\x0B',
169        u'N': u'\u0085',
170        u'L': u'\u2028',
171        u'P': u'\u2029',
172        u'_': u'_',
173        u'0': u'\x00',
174
175    }
176
177    def scan_scalar(self):
178        self.index += 1
179        chunks = []
180        start = self.index
181        ignore_spaces = False
182        while self.data[self.index] != u'"':
183            if self.data[self.index] == u'\\':
184                ignore_spaces = False
185                chunks.append(self.data[start:self.index])
186                self.index += 1
187                ch = self.data[self.index]
188                self.index += 1
189                if ch == u'\n':
190                    ignore_spaces = True
191                elif ch in self.QUOTE_CODES:
192                    length = self.QUOTE_CODES[ch]
193                    code = int(self.data[self.index:self.index+length], 16)
194                    chunks.append(unichr(code))
195                    self.index += length
196                else:
197                    chunks.append(self.QUOTE_REPLACES[ch])
198                start = self.index
199            elif self.data[self.index] == u'\n':
[48]200                chunks.append(self.data[start:self.index])
[43]201                chunks.append(u' ')
202                self.index += 1
[48]203                start = self.index
[43]204                ignore_spaces = True
205            elif ignore_spaces and self.data[self.index] == u' ':
206                self.index += 1
207                start = self.index
208            else:
209                ignore_spaces = False
210                self.index += 1
211        chunks.append(self.data[start:self.index])
212        self.index += 1
[51]213        return ScalarToken(u''.join(chunks), False, None, None)
[43]214
215    def find_token(self):
216        found = False
217        while not found:
218            while self.data[self.index] in u' \t':
219                self.index += 1
220            if self.data[self.index] == u'#':
221                while self.data[self.index] != u'\n':
222                    self.index += 1
223            if self.data[self.index] == u'\n':
224                self.index += 1
225            else:
226                found = True
227
228class CanonicalParser:
229
[136]230    def __init__(self):
[51]231        self.events = []
[136]232        self.parse()
[43]233
[118]234    # stream: STREAM-START document* STREAM-END
[43]235    def parse_stream(self):
[136]236        self.get_token(StreamStartToken)
[118]237        self.events.append(StreamStartEvent(None, None))
[136]238        while not self.check_token(StreamEndToken):
239            if self.check_token(DirectiveToken, DocumentStartToken):
[51]240                self.parse_document()
[43]241            else:
242                raise Error("document is expected, got "+repr(self.tokens[self.index]))
[136]243        self.get_token(StreamEndToken)
[51]244        self.events.append(StreamEndEvent(None, None))
[43]245
[51]246    # document: DIRECTIVE? DOCUMENT-START node
[43]247    def parse_document(self):
248        node = None
[136]249        if self.check_token(DirectiveToken):
250            self.get_token(DirectiveToken)
251        self.get_token(DocumentStartToken)
[118]252        self.events.append(DocumentStartEvent(None, None))
[51]253        self.parse_node()
[118]254        self.events.append(DocumentEndEvent(None, None))
[43]255
256    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
257    def parse_node(self):
[136]258        if self.check_token(AliasToken):
259            self.events.append(AliasEvent(self.get_token_value(), None, None))
[43]260        else:
261            anchor = None
[136]262            if self.check_token(AnchorToken):
263                anchor = self.get_token_value()
[132]264            tag = None
[136]265            if self.check_token(TagToken):
266                tag = self.get_token_value()
267            if self.check_token(ScalarToken):
[137]268                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
[136]269            elif self.check_token(FlowSequenceStartToken):
[130]270                self.events.append(SequenceStartEvent(anchor, tag, None, None))
[51]271                self.parse_sequence()
[136]272            elif self.check_token(FlowMappingStartToken):
[130]273                self.events.append(MappingStartEvent(anchor, tag, None, None))
[51]274                self.parse_mapping()
[43]275            else:
276                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
277
278    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
279    def parse_sequence(self):
[136]280        self.get_token(FlowSequenceStartToken)
281        if not self.check_token(FlowSequenceEndToken):
[51]282            self.parse_node()
[136]283            while not self.check_token(FlowSequenceEndToken):
284                self.get_token(FlowEntryToken)
285                if not self.check_token(FlowSequenceEndToken):
[51]286                    self.parse_node()
[136]287        self.get_token(FlowSequenceEndToken)
[130]288        self.events.append(SequenceEndEvent(None, None))
[43]289
290    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
291    def parse_mapping(self):
[136]292        self.get_token(FlowMappingStartToken)
293        if not self.check_token(FlowMappingEndToken):
[51]294            self.parse_map_entry()
[136]295            while not self.check_token(FlowMappingEndToken):
296                self.get_token(FlowEntryToken)
297                if not self.check_token(FlowMappingEndToken):
[51]298                    self.parse_map_entry()
[136]299        self.get_token(FlowMappingEndToken)
[130]300        self.events.append(MappingEndEvent(None, None))
[43]301
302    # map_entry: KEY node VALUE node
303    def parse_map_entry(self):
[136]304        self.get_token(KeyToken)
[51]305        self.parse_node()
[136]306        self.get_token(ValueToken)
[51]307        self.parse_node()
[43]308
309    def parse(self):
[51]310        self.parse_stream()
[43]311
[136]312    def get_event(self):
[54]313        return self.events.pop(0)
314
[136]315    def check_event(self, *choices):
316        if self.events:
317            if not choices:
[54]318                return True
[136]319            for choice in choices:
320                if isinstance(self.events[0], choice):
321                    return True
[54]322        return False
323
[136]324    def peek_event(self):
[54]325        return self.events[0]
326
[137]327class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
[136]328
329    def __init__(self, stream):
330        if hasattr(stream, 'read'):
331            stream = stream.read()
332        CanonicalScanner.__init__(self, stream)
333        CanonicalParser.__init__(self)
334        Composer.__init__(self)
335        Constructor.__init__(self)
[137]336        Resolver.__init__(self)
[136]337
338def canonical_scan(stream):
339    return scan(stream, Loader=CanonicalLoader)
340
341def canonical_parse(stream):
342    return parse(stream, Loader=CanonicalLoader)
343
344def canonical_compose(stream):
345    return compose(stream, Loader=CanonicalLoader)
346
347def canonical_compose_all(stream):
348    return compose_all(stream, Loader=CanonicalLoader)
349
350def canonical_load(stream):
351    return load(stream, Loader=CanonicalLoader)
352
353def canonical_load_all(stream):
354    return load_all(stream, Loader=CanonicalLoader)
355
Note: See TracBrowser for help on using the repository browser.