source: pyyaml/branches/working-on-emitter/tests/test_appliance.py @ 121

Revision 121, 10.9 KB checked in by xi, 9 years ago (diff)

Add a branch for working on Emitter.

Line 
1
2import unittest, os
3
4from yaml.tokens import *
5from yaml.events import *
6
7class TestAppliance(unittest.TestCase):
8
9    DATA = 'tests/data'
10
11    all_tests = {}
12    for filename in os.listdir(DATA):
13        if os.path.isfile(os.path.join(DATA, filename)):
14            root, ext = os.path.splitext(filename)
15            all_tests.setdefault(root, []).append(ext)
16
17    def add_tests(cls, method_name, *extensions):
18        for test in cls.all_tests:
19            available_extensions = cls.all_tests[test]
20            for ext in extensions:
21                if ext not in available_extensions:
22                    break
23            else:
24                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
25                def test_method(self, test=test, filenames=filenames):
26                    getattr(self, '_'+method_name)(test, *filenames)
27                test = test.replace('-', '_')
28                try:
29                    test_method.__name__ = '%s_%s' % (method_name, test)
30                except TypeError:
31                    import new
32                    test_method = new.function(test_method.func_code, test_method.func_globals,
33                            '%s_%s' % (method_name, test), test_method.func_defaults,
34                            test_method.func_closure)
35                setattr(cls, test_method.__name__, test_method)
36    add_tests = classmethod(add_tests)
37
38class Error(Exception):
39    pass
40
41class CanonicalScanner:
42
43    def __init__(self, data):
44        self.data = unicode(data, 'utf-8')+u'\0'
45        self.index = 0
46
47    def scan(self):
48        #print self.data[self.index:]
49        tokens = []
50        tokens.append(StreamStartToken(None, None))
51        while True:
52            self.find_token()
53            ch = self.data[self.index]
54            if ch == u'\0':
55                tokens.append(StreamEndToken(None, None))
56                break
57            elif ch == u'%':
58                tokens.append(self.scan_directive())
59            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
60                self.index += 3
61                tokens.append(DocumentStartToken(None, None))
62            elif ch == u'[':
63                self.index += 1
64                tokens.append(FlowSequenceStartToken(None, None))
65            elif ch == u'{':
66                self.index += 1
67                tokens.append(FlowMappingStartToken(None, None))
68            elif ch == u']':
69                self.index += 1
70                tokens.append(FlowSequenceEndToken(None, None))
71            elif ch == u'}':
72                self.index += 1
73                tokens.append(FlowMappingEndToken(None, None))
74            elif ch == u'?':
75                self.index += 1
76                tokens.append(KeyToken(None, None))
77            elif ch == u':':
78                self.index += 1
79                tokens.append(ValueToken(None, None))
80            elif ch == u',':
81                self.index += 1
82                tokens.append(FlowEntryToken(None, None))
83            elif ch == u'*' or ch == u'&':
84                tokens.append(self.scan_alias())
85            elif ch == u'!':
86                tokens.append(self.scan_tag())
87            elif ch == u'"':
88                tokens.append(self.scan_scalar())
89            else:
90                raise Error("invalid token")
91        return tokens
92
93    DIRECTIVE = u'%YAML 1.1'
94
95    def scan_directive(self):
96        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
97                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
98            self.index += len(self.DIRECTIVE)
99            return DirectiveToken('YAML', (1, 1), None, None)
100
101    def scan_alias(self):
102        if self.data[self.index] == u'*':
103            TokenClass = AliasToken
104        else:
105            TokenClass = AnchorToken
106        self.index += 1
107        start = self.index
108        while self.data[self.index] not in u', \n\0':
109            self.index += 1
110        value = self.data[start:self.index]
111        return TokenClass(value, None, None)
112
113    def scan_tag(self):
114        self.index += 1
115        start = self.index
116        while self.data[self.index] not in u' \n\0':
117            self.index += 1
118        value = self.data[start:self.index]
119        if value[0] == u'!':
120            value = 'tag:yaml.org,2002:'+value[1:]
121        elif value[0] == u'<' and value[-1] == u'>':
122            value = value[1:-1]
123        else:
124            value = u'!'+value
125        return TagToken(value, None, None)
126
127    QUOTE_CODES = {
128        'x': 2,
129        'u': 4,
130        'U': 8,
131    }
132
133    QUOTE_REPLACES = {
134        u'\\': u'\\',
135        u'\"': u'\"',
136        u' ': u' ',
137        u'a': u'\x07',
138        u'b': u'\x08',
139        u'e': u'\x1B',
140        u'f': u'\x0C',
141        u'n': u'\x0A',
142        u'r': u'\x0D',
143        u't': u'\x09',
144        u'v': u'\x0B',
145        u'N': u'\u0085',
146        u'L': u'\u2028',
147        u'P': u'\u2029',
148        u'_': u'_',
149        u'0': u'\x00',
150
151    }
152
153    def scan_scalar(self):
154        self.index += 1
155        chunks = []
156        start = self.index
157        ignore_spaces = False
158        while self.data[self.index] != u'"':
159            if self.data[self.index] == u'\\':
160                ignore_spaces = False
161                chunks.append(self.data[start:self.index])
162                self.index += 1
163                ch = self.data[self.index]
164                self.index += 1
165                if ch == u'\n':
166                    ignore_spaces = True
167                elif ch in self.QUOTE_CODES:
168                    length = self.QUOTE_CODES[ch]
169                    code = int(self.data[self.index:self.index+length], 16)
170                    chunks.append(unichr(code))
171                    self.index += length
172                else:
173                    chunks.append(self.QUOTE_REPLACES[ch])
174                start = self.index
175            elif self.data[self.index] == u'\n':
176                chunks.append(self.data[start:self.index])
177                chunks.append(u' ')
178                self.index += 1
179                start = self.index
180                ignore_spaces = True
181            elif ignore_spaces and self.data[self.index] == u' ':
182                self.index += 1
183                start = self.index
184            else:
185                ignore_spaces = False
186                self.index += 1
187        chunks.append(self.data[start:self.index])
188        self.index += 1
189        return ScalarToken(u''.join(chunks), False, None, None)
190
191    def find_token(self):
192        found = False
193        while not found:
194            while self.data[self.index] in u' \t':
195                self.index += 1
196            if self.data[self.index] == u'#':
197                while self.data[self.index] != u'\n':
198                    self.index += 1
199            if self.data[self.index] == u'\n':
200                self.index += 1
201            else:
202                found = True
203
204class CanonicalParser:
205
206    def __init__(self, data):
207        self.scanner = CanonicalScanner(data)
208        self.events = []
209
210    # stream: STREAM-START document* STREAM-END
211    def parse_stream(self):
212        self.consume_token(StreamStartToken)
213        self.events.append(StreamStartEvent(None, None))
214        while not self.test_token(StreamEndToken):
215            if self.test_token(DirectiveToken, DocumentStartToken):
216                self.parse_document()
217            else:
218                raise Error("document is expected, got "+repr(self.tokens[self.index]))
219        self.consume_token(StreamEndToken)
220        self.events.append(StreamEndEvent(None, None))
221
222    # document: DIRECTIVE? DOCUMENT-START node
223    def parse_document(self):
224        node = None
225        if self.test_token(DirectiveToken):
226            self.consume_token(DirectiveToken)
227        self.consume_token(DocumentStartToken)
228        self.events.append(DocumentStartEvent(None, None))
229        self.parse_node()
230        self.events.append(DocumentEndEvent(None, None))
231
232    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
233    def parse_node(self):
234        if self.test_token(AliasToken):
235            self.events.append(AliasEvent(self.get_value(), None, None))
236        else:
237            anchor = None
238            if self.test_token(AnchorToken):
239                anchor = self.get_value()
240            tag = u'!'
241            if self.test_token(TagToken):
242                tag = self.get_value()
243            if self.test_token(ScalarToken):
244                self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
245            elif self.test_token(FlowSequenceStartToken):
246                self.events.append(SequenceEvent(anchor, tag, None, None))
247                self.parse_sequence()
248            elif self.test_token(FlowMappingStartToken):
249                self.events.append(MappingEvent(anchor, tag, None, None))
250                self.parse_mapping()
251            else:
252                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
253
254    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
255    def parse_sequence(self):
256        self.consume_token(FlowSequenceStartToken)
257        if not self.test_token(FlowSequenceEndToken):
258            self.parse_node()
259            while not self.test_token(FlowSequenceEndToken):
260                self.consume_token(FlowEntryToken)
261                if not self.test_token(FlowSequenceEndToken):
262                    self.parse_node()
263        self.consume_token(FlowSequenceEndToken)
264        self.events.append(CollectionEndEvent(None, None))
265
266    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
267    def parse_mapping(self):
268        self.consume_token(FlowMappingStartToken)
269        if not self.test_token(FlowMappingEndToken):
270            self.parse_map_entry()
271            while not self.test_token(FlowMappingEndToken):
272                self.consume_token(FlowEntryToken)
273                if not self.test_token(FlowMappingEndToken):
274                    self.parse_map_entry()
275        self.consume_token(FlowMappingEndToken)
276        self.events.append(CollectionEndEvent(None, None))
277
278    # map_entry: KEY node VALUE node
279    def parse_map_entry(self):
280        self.consume_token(KeyToken)
281        self.parse_node()
282        self.consume_token(ValueToken)
283        self.parse_node()
284
285    def test_token(self, *choices):
286        for choice in choices:
287            if isinstance(self.tokens[self.index], choice):
288                return True
289        return False
290
291    def consume_token(self, cls):
292        if not isinstance(self.tokens[self.index], cls):
293            raise Error("unexpected token "+repr(self.tokens[self.index]))
294        self.index += 1
295
296    def get_value(self):
297        value = self.tokens[self.index].value
298        self.index += 1
299        return value
300
301    def parse(self):
302        self.tokens = self.scanner.scan()
303        self.index = 0
304        self.parse_stream()
305        return self.events
306
307    def get(self):
308        return self.events.pop(0)
309
310    def check(self, *choices):
311        for choice in choices:
312            if isinstance(self.events[0], choice):
313                return True
314        return False
315
316    def peek(self):
317        return self.events[0]
318
Note: See TracBrowser for help on using the repository browser.