source: pyyaml/trunk/tests/canonical.py @ 322

Revision 322, 12.0 KB checked in by xi, 5 years ago (diff)

Refactored the test suite; updated include and library paths in setup.cfg.

Line 
1
2import yaml, yaml.composer, yaml.constructor, yaml.resolver
3
4class CanonicalError(yaml.YAMLError):
5    pass
6
7class CanonicalScanner:
8
9    def __init__(self, data):
10        try:
11            self.data = unicode(data, 'utf-8')+u'\0'
12        except UnicodeDecodeError:
13            raise CanonicalError("utf-8 stream is expected")
14        self.index = 0
15        self.tokens = []
16        self.scanned = False
17
18    def check_token(self, *choices):
19        if not self.scanned:
20            self.scan()
21        if self.tokens:
22            if not choices:
23                return True
24            for choice in choices:
25                if isinstance(self.tokens[0], choice):
26                    return True
27        return False
28
29    def peek_token(self):
30        if not self.scanned:
31            self.scan()
32        if self.tokens:
33            return self.tokens[0]
34
35    def get_token(self, choice=None):
36        if not self.scanned:
37            self.scan()
38        token = self.tokens.pop(0)
39        if choice and not isinstance(token, choice):
40            raise CanonicalError("unexpected token "+repr(token))
41        return token
42
43    def get_token_value(self):
44        token = self.get_token()
45        return token.value
46
47    def scan(self):
48        self.tokens.append(yaml.StreamStartToken(None, None))
49        while True:
50            self.find_token()
51            ch = self.data[self.index]
52            if ch == u'\0':
53                self.tokens.append(yaml.StreamEndToken(None, None))
54                break
55            elif ch == u'%':
56                self.tokens.append(self.scan_directive())
57            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
58                self.index += 3
59                self.tokens.append(yaml.DocumentStartToken(None, None))
60            elif ch == u'[':
61                self.index += 1
62                self.tokens.append(yaml.FlowSequenceStartToken(None, None))
63            elif ch == u'{':
64                self.index += 1
65                self.tokens.append(yaml.FlowMappingStartToken(None, None))
66            elif ch == u']':
67                self.index += 1
68                self.tokens.append(yaml.FlowSequenceEndToken(None, None))
69            elif ch == u'}':
70                self.index += 1
71                self.tokens.append(yaml.FlowMappingEndToken(None, None))
72            elif ch == u'?':
73                self.index += 1
74                self.tokens.append(yaml.KeyToken(None, None))
75            elif ch == u':':
76                self.index += 1
77                self.tokens.append(yaml.ValueToken(None, None))
78            elif ch == u',':
79                self.index += 1
80                self.tokens.append(yaml.FlowEntryToken(None, None))
81            elif ch == u'*' or ch == u'&':
82                self.tokens.append(self.scan_alias())
83            elif ch == u'!':
84                self.tokens.append(self.scan_tag())
85            elif ch == u'"':
86                self.tokens.append(self.scan_scalar())
87            else:
88                raise CanonicalError("invalid token")
89        self.scanned = True
90
91    DIRECTIVE = u'%YAML 1.1'
92
93    def scan_directive(self):
94        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
95                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
96            self.index += len(self.DIRECTIVE)
97            return yaml.DirectiveToken('YAML', (1, 1), None, None)
98        else:
99            raise CanonicalError("invalid directive")
100
101    def scan_alias(self):
102        if self.data[self.index] == u'*':
103            TokenClass = yaml.AliasToken
104        else:
105            TokenClass = yaml.AnchorToken
106        self.index += 1
107        start = self.index
108        while self.data[self.index] not in u', \n\0':
109            self.index += 1
110        value = self.data[start:self.index]
111        return TokenClass(value, None, None)
112
113    def scan_tag(self):
114        self.index += 1
115        start = self.index
116        while self.data[self.index] not in u' \n\0':
117            self.index += 1
118        value = self.data[start:self.index]
119        if not value:
120            value = u'!'
121        elif value[0] == u'!':
122            value = 'tag:yaml.org,2002:'+value[1:]
123        elif value[0] == u'<' and value[-1] == u'>':
124            value = value[1:-1]
125        else:
126            value = u'!'+value
127        return yaml.TagToken(value, None, None)
128
129    QUOTE_CODES = {
130        'x': 2,
131        'u': 4,
132        'U': 8,
133    }
134
135    QUOTE_REPLACES = {
136        u'\\': u'\\',
137        u'\"': u'\"',
138        u' ': u' ',
139        u'a': u'\x07',
140        u'b': u'\x08',
141        u'e': u'\x1B',
142        u'f': u'\x0C',
143        u'n': u'\x0A',
144        u'r': u'\x0D',
145        u't': u'\x09',
146        u'v': u'\x0B',
147        u'N': u'\u0085',
148        u'L': u'\u2028',
149        u'P': u'\u2029',
150        u'_': u'_',
151        u'0': u'\x00',
152
153    }
154
155    def scan_scalar(self):
156        self.index += 1
157        chunks = []
158        start = self.index
159        ignore_spaces = False
160        while self.data[self.index] != u'"':
161            if self.data[self.index] == u'\\':
162                ignore_spaces = False
163                chunks.append(self.data[start:self.index])
164                self.index += 1
165                ch = self.data[self.index]
166                self.index += 1
167                if ch == u'\n':
168                    ignore_spaces = True
169                elif ch in self.QUOTE_CODES:
170                    length = self.QUOTE_CODES[ch]
171                    code = int(self.data[self.index:self.index+length], 16)
172                    chunks.append(unichr(code))
173                    self.index += length
174                else:
175                    if ch not in self.QUOTE_REPLACES:
176                        raise CanonicalError("invalid escape code")
177                    chunks.append(self.QUOTE_REPLACES[ch])
178                start = self.index
179            elif self.data[self.index] == u'\n':
180                chunks.append(self.data[start:self.index])
181                chunks.append(u' ')
182                self.index += 1
183                start = self.index
184                ignore_spaces = True
185            elif ignore_spaces and self.data[self.index] == u' ':
186                self.index += 1
187                start = self.index
188            else:
189                ignore_spaces = False
190                self.index += 1
191        chunks.append(self.data[start:self.index])
192        self.index += 1
193        return yaml.ScalarToken(u''.join(chunks), False, None, None)
194
195    def find_token(self):
196        found = False
197        while not found:
198            while self.data[self.index] in u' \t':
199                self.index += 1
200            if self.data[self.index] == u'#':
201                while self.data[self.index] != u'\n':
202                    self.index += 1
203            if self.data[self.index] == u'\n':
204                self.index += 1
205            else:
206                found = True
207
208class CanonicalParser:
209
210    def __init__(self):
211        self.events = []
212        self.parsed = False
213
214    # stream: STREAM-START document* STREAM-END
215    def parse_stream(self):
216        self.get_token(yaml.StreamStartToken)
217        self.events.append(yaml.StreamStartEvent(None, None))
218        while not self.check_token(yaml.StreamEndToken):
219            if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken):
220                self.parse_document()
221            else:
222                raise CanonicalError("document is expected, got "+repr(self.tokens[0]))
223        self.get_token(yaml.StreamEndToken)
224        self.events.append(yaml.StreamEndEvent(None, None))
225
226    # document: DIRECTIVE? DOCUMENT-START node
227    def parse_document(self):
228        node = None
229        if self.check_token(yaml.DirectiveToken):
230            self.get_token(yaml.DirectiveToken)
231        self.get_token(yaml.DocumentStartToken)
232        self.events.append(yaml.DocumentStartEvent(None, None))
233        self.parse_node()
234        self.events.append(yaml.DocumentEndEvent(None, None))
235
236    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
237    def parse_node(self):
238        if self.check_token(yaml.AliasToken):
239            self.events.append(yaml.AliasEvent(self.get_token_value(), None, None))
240        else:
241            anchor = None
242            if self.check_token(yaml.AnchorToken):
243                anchor = self.get_token_value()
244            tag = None
245            if self.check_token(yaml.TagToken):
246                tag = self.get_token_value()
247            if self.check_token(yaml.ScalarToken):
248                self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
249            elif self.check_token(yaml.FlowSequenceStartToken):
250                self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None))
251                self.parse_sequence()
252            elif self.check_token(yaml.FlowMappingStartToken):
253                self.events.append(yaml.MappingStartEvent(anchor, tag, None, None))
254                self.parse_mapping()
255            else:
256                raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0]))
257
258    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
259    def parse_sequence(self):
260        self.get_token(yaml.FlowSequenceStartToken)
261        if not self.check_token(yaml.FlowSequenceEndToken):
262            self.parse_node()
263            while not self.check_token(yaml.FlowSequenceEndToken):
264                self.get_token(yaml.FlowEntryToken)
265                if not self.check_token(yaml.FlowSequenceEndToken):
266                    self.parse_node()
267        self.get_token(yaml.FlowSequenceEndToken)
268        self.events.append(yaml.SequenceEndEvent(None, None))
269
270    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
271    def parse_mapping(self):
272        self.get_token(yaml.FlowMappingStartToken)
273        if not self.check_token(yaml.FlowMappingEndToken):
274            self.parse_map_entry()
275            while not self.check_token(yaml.FlowMappingEndToken):
276                self.get_token(yaml.FlowEntryToken)
277                if not self.check_token(yaml.FlowMappingEndToken):
278                    self.parse_map_entry()
279        self.get_token(yaml.FlowMappingEndToken)
280        self.events.append(yaml.MappingEndEvent(None, None))
281
282    # map_entry: KEY node VALUE node
283    def parse_map_entry(self):
284        self.get_token(yaml.KeyToken)
285        self.parse_node()
286        self.get_token(yaml.ValueToken)
287        self.parse_node()
288
289    def parse(self):
290        self.parse_stream()
291        self.parsed = True
292
293    def get_event(self):
294        if not self.parsed:
295            self.parse()
296        return self.events.pop(0)
297
298    def check_event(self, *choices):
299        if not self.parsed:
300            self.parse()
301        if self.events:
302            if not choices:
303                return True
304            for choice in choices:
305                if isinstance(self.events[0], choice):
306                    return True
307        return False
308
309    def peek_event(self):
310        if not self.parsed:
311            self.parse()
312        return self.events[0]
313
314class CanonicalLoader(CanonicalScanner, CanonicalParser,
315        yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver):
316
317    def __init__(self, stream):
318        if hasattr(stream, 'read'):
319            stream = stream.read()
320        CanonicalScanner.__init__(self, stream)
321        CanonicalParser.__init__(self)
322        yaml.composer.Composer.__init__(self)
323        yaml.constructor.Constructor.__init__(self)
324        yaml.resolver.Resolver.__init__(self)
325
326yaml.CanonicalLoader = CanonicalLoader
327
328def canonical_scan(stream):
329    return yaml.scan(stream, Loader=CanonicalLoader)
330
331yaml.canonical_scan = canonical_scan
332
333def canonical_parse(stream):
334    return yaml.parse(stream, Loader=CanonicalLoader)
335
336yaml.canonical_parse = canonical_parse
337
338def canonical_compose(stream):
339    return yaml.compose(stream, Loader=CanonicalLoader)
340
341yaml.canonical_compose = canonical_compose
342
343def canonical_compose_all(stream):
344    return yaml.compose_all(stream, Loader=CanonicalLoader)
345
346yaml.canonical_compose_all = canonical_compose_all
347
348def canonical_load(stream):
349    return yaml.load(stream, Loader=CanonicalLoader)
350
351yaml.canonical_load = canonical_load
352
353def canonical_load_all(stream):
354    return yaml.load_all(stream, Loader=CanonicalLoader)
355
356yaml.canonical_load_all = canonical_load_all
357
Note: See TracBrowser for help on using the repository browser.