source: pyyaml/trunk/tests/test_appliance.py @ 146

Revision 146, 12.0 KB checked in by xi, 8 years ago (diff)

Fix !!python/name for Python 2.3. Clear the yaml module namespace.

Line 
1
2import unittest, os
3
4from yaml import *
5from yaml.composer import *
6from yaml.constructor import *
7from yaml.resolver import *
8
9class TestAppliance(unittest.TestCase):
10
11    DATA = 'tests/data'
12
13    all_tests = {}
14    for filename in os.listdir(DATA):
15        if os.path.isfile(os.path.join(DATA, filename)):
16            root, ext = os.path.splitext(filename)
17            all_tests.setdefault(root, []).append(ext)
18
19    def add_tests(cls, method_name, *extensions):
20        for test in cls.all_tests:
21            available_extensions = cls.all_tests[test]
22            for ext in extensions:
23                if ext not in available_extensions:
24                    break
25            else:
26                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
27                def test_method(self, test=test, filenames=filenames):
28                    getattr(self, '_'+method_name)(test, *filenames)
29                test = test.replace('-', '_')
30                try:
31                    test_method.__name__ = '%s_%s' % (method_name, test)
32                except TypeError:
33                    import new
34                    test_method = new.function(test_method.func_code, test_method.func_globals,
35                            '%s_%s' % (method_name, test), test_method.func_defaults,
36                            test_method.func_closure)
37                setattr(cls, test_method.__name__, test_method)
38    add_tests = classmethod(add_tests)
39
40class Error(Exception):
41    pass
42
43class CanonicalScanner:
44
45    def __init__(self, data):
46        self.data = unicode(data, 'utf-8')+u'\0'
47        self.index = 0
48        self.scan()
49
50    def check_token(self, *choices):
51        if self.tokens:
52            if not choices:
53                return True
54            for choice in choices:
55                if isinstance(self.tokens[0], choice):
56                    return True
57        return False
58
59    def peek_token(self):
60        if self.tokens:
61            return self.tokens[0]
62
63    def get_token(self, choice=None):
64        token = self.tokens.pop(0)
65        if choice and not isinstance(token, choice):
66            raise Error("unexpected token "+repr(token))
67        return token
68
69    def get_token_value(self):
70        token = self.get_token()
71        return token.value
72
73    def scan(self):
74        self.tokens = []
75        self.tokens.append(StreamStartToken(None, None))
76        while True:
77            self.find_token()
78            ch = self.data[self.index]
79            if ch == u'\0':
80                self.tokens.append(StreamEndToken(None, None))
81                break
82            elif ch == u'%':
83                self.tokens.append(self.scan_directive())
84            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
85                self.index += 3
86                self.tokens.append(DocumentStartToken(None, None))
87            elif ch == u'[':
88                self.index += 1
89                self.tokens.append(FlowSequenceStartToken(None, None))
90            elif ch == u'{':
91                self.index += 1
92                self.tokens.append(FlowMappingStartToken(None, None))
93            elif ch == u']':
94                self.index += 1
95                self.tokens.append(FlowSequenceEndToken(None, None))
96            elif ch == u'}':
97                self.index += 1
98                self.tokens.append(FlowMappingEndToken(None, None))
99            elif ch == u'?':
100                self.index += 1
101                self.tokens.append(KeyToken(None, None))
102            elif ch == u':':
103                self.index += 1
104                self.tokens.append(ValueToken(None, None))
105            elif ch == u',':
106                self.index += 1
107                self.tokens.append(FlowEntryToken(None, None))
108            elif ch == u'*' or ch == u'&':
109                self.tokens.append(self.scan_alias())
110            elif ch == u'!':
111                self.tokens.append(self.scan_tag())
112            elif ch == u'"':
113                self.tokens.append(self.scan_scalar())
114            else:
115                raise Error("invalid token")
116
117    DIRECTIVE = u'%YAML 1.1'
118
119    def scan_directive(self):
120        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
121                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
122            self.index += len(self.DIRECTIVE)
123            return DirectiveToken('YAML', (1, 1), None, None)
124
125    def scan_alias(self):
126        if self.data[self.index] == u'*':
127            TokenClass = AliasToken
128        else:
129            TokenClass = AnchorToken
130        self.index += 1
131        start = self.index
132        while self.data[self.index] not in u', \n\0':
133            self.index += 1
134        value = self.data[start:self.index]
135        return TokenClass(value, None, None)
136
137    def scan_tag(self):
138        self.index += 1
139        start = self.index
140        while self.data[self.index] not in u' \n\0':
141            self.index += 1
142        value = self.data[start:self.index]
143        if value[0] == u'!':
144            value = 'tag:yaml.org,2002:'+value[1:]
145        elif value[0] == u'<' and value[-1] == u'>':
146            value = value[1:-1]
147        else:
148            value = u'!'+value
149        return TagToken(value, None, None)
150
151    QUOTE_CODES = {
152        'x': 2,
153        'u': 4,
154        'U': 8,
155    }
156
157    QUOTE_REPLACES = {
158        u'\\': u'\\',
159        u'\"': u'\"',
160        u' ': u' ',
161        u'a': u'\x07',
162        u'b': u'\x08',
163        u'e': u'\x1B',
164        u'f': u'\x0C',
165        u'n': u'\x0A',
166        u'r': u'\x0D',
167        u't': u'\x09',
168        u'v': u'\x0B',
169        u'N': u'\u0085',
170        u'L': u'\u2028',
171        u'P': u'\u2029',
172        u'_': u'_',
173        u'0': u'\x00',
174
175    }
176
177    def scan_scalar(self):
178        self.index += 1
179        chunks = []
180        start = self.index
181        ignore_spaces = False
182        while self.data[self.index] != u'"':
183            if self.data[self.index] == u'\\':
184                ignore_spaces = False
185                chunks.append(self.data[start:self.index])
186                self.index += 1
187                ch = self.data[self.index]
188                self.index += 1
189                if ch == u'\n':
190                    ignore_spaces = True
191                elif ch in self.QUOTE_CODES:
192                    length = self.QUOTE_CODES[ch]
193                    code = int(self.data[self.index:self.index+length], 16)
194                    chunks.append(unichr(code))
195                    self.index += length
196                else:
197                    chunks.append(self.QUOTE_REPLACES[ch])
198                start = self.index
199            elif self.data[self.index] == u'\n':
200                chunks.append(self.data[start:self.index])
201                chunks.append(u' ')
202                self.index += 1
203                start = self.index
204                ignore_spaces = True
205            elif ignore_spaces and self.data[self.index] == u' ':
206                self.index += 1
207                start = self.index
208            else:
209                ignore_spaces = False
210                self.index += 1
211        chunks.append(self.data[start:self.index])
212        self.index += 1
213        return ScalarToken(u''.join(chunks), False, None, None)
214
215    def find_token(self):
216        found = False
217        while not found:
218            while self.data[self.index] in u' \t':
219                self.index += 1
220            if self.data[self.index] == u'#':
221                while self.data[self.index] != u'\n':
222                    self.index += 1
223            if self.data[self.index] == u'\n':
224                self.index += 1
225            else:
226                found = True
227
228class CanonicalParser:
229
230    def __init__(self):
231        self.events = []
232        self.parse()
233
234    # stream: STREAM-START document* STREAM-END
235    def parse_stream(self):
236        self.get_token(StreamStartToken)
237        self.events.append(StreamStartEvent(None, None))
238        while not self.check_token(StreamEndToken):
239            if self.check_token(DirectiveToken, DocumentStartToken):
240                self.parse_document()
241            else:
242                raise Error("document is expected, got "+repr(self.tokens[self.index]))
243        self.get_token(StreamEndToken)
244        self.events.append(StreamEndEvent(None, None))
245
246    # document: DIRECTIVE? DOCUMENT-START node
247    def parse_document(self):
248        node = None
249        if self.check_token(DirectiveToken):
250            self.get_token(DirectiveToken)
251        self.get_token(DocumentStartToken)
252        self.events.append(DocumentStartEvent(None, None))
253        self.parse_node()
254        self.events.append(DocumentEndEvent(None, None))
255
256    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
257    def parse_node(self):
258        if self.check_token(AliasToken):
259            self.events.append(AliasEvent(self.get_token_value(), None, None))
260        else:
261            anchor = None
262            if self.check_token(AnchorToken):
263                anchor = self.get_token_value()
264            tag = None
265            if self.check_token(TagToken):
266                tag = self.get_token_value()
267            if self.check_token(ScalarToken):
268                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
269            elif self.check_token(FlowSequenceStartToken):
270                self.events.append(SequenceStartEvent(anchor, tag, None, None))
271                self.parse_sequence()
272            elif self.check_token(FlowMappingStartToken):
273                self.events.append(MappingStartEvent(anchor, tag, None, None))
274                self.parse_mapping()
275            else:
276                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
277
278    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
279    def parse_sequence(self):
280        self.get_token(FlowSequenceStartToken)
281        if not self.check_token(FlowSequenceEndToken):
282            self.parse_node()
283            while not self.check_token(FlowSequenceEndToken):
284                self.get_token(FlowEntryToken)
285                if not self.check_token(FlowSequenceEndToken):
286                    self.parse_node()
287        self.get_token(FlowSequenceEndToken)
288        self.events.append(SequenceEndEvent(None, None))
289
290    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
291    def parse_mapping(self):
292        self.get_token(FlowMappingStartToken)
293        if not self.check_token(FlowMappingEndToken):
294            self.parse_map_entry()
295            while not self.check_token(FlowMappingEndToken):
296                self.get_token(FlowEntryToken)
297                if not self.check_token(FlowMappingEndToken):
298                    self.parse_map_entry()
299        self.get_token(FlowMappingEndToken)
300        self.events.append(MappingEndEvent(None, None))
301
302    # map_entry: KEY node VALUE node
303    def parse_map_entry(self):
304        self.get_token(KeyToken)
305        self.parse_node()
306        self.get_token(ValueToken)
307        self.parse_node()
308
309    def parse(self):
310        self.parse_stream()
311
312    def get_event(self):
313        return self.events.pop(0)
314
315    def check_event(self, *choices):
316        if self.events:
317            if not choices:
318                return True
319            for choice in choices:
320                if isinstance(self.events[0], choice):
321                    return True
322        return False
323
324    def peek_event(self):
325        return self.events[0]
326
327class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
328
329    def __init__(self, stream):
330        if hasattr(stream, 'read'):
331            stream = stream.read()
332        CanonicalScanner.__init__(self, stream)
333        CanonicalParser.__init__(self)
334        Composer.__init__(self)
335        Constructor.__init__(self)
336        Resolver.__init__(self)
337
338def canonical_scan(stream):
339    return scan(stream, Loader=CanonicalLoader)
340
341def canonical_parse(stream):
342    return parse(stream, Loader=CanonicalLoader)
343
344def canonical_compose(stream):
345    return compose(stream, Loader=CanonicalLoader)
346
347def canonical_compose_all(stream):
348    return compose_all(stream, Loader=CanonicalLoader)
349
350def canonical_load(stream):
351    return load(stream, Loader=CanonicalLoader)
352
353def canonical_load_all(stream):
354    return load_all(stream, Loader=CanonicalLoader)
355
Note: See TracBrowser for help on using the repository browser.