source: trunk/sandbox/my-parser/canonical.py @ 37

Revision 37, 11.7 KB checked in by xi, 9 years ago (diff)

Add the skeleton of a pure Python parser. It's surprisingly simple.

Line 
1
2import re, unittest
3
4class Marker(object):
5
6    def __init__(self, source, data, index, length=0):
7        self.source = source
8        self.data = data
9        self.index = index
10        self.length = length
11        self._line = None
12        self._position = None
13
14    def line(self):
15        if not self._line:
16            self._make_line_position()
17        return self._line
18
19    def position(self):
20        if not self._position:
21            self._make_line_position()
22        return self._position
23
24    def _make_line_position(self):
25        line_start = self.data.rfind('\n', 0, self.index)+1
26        line_end = self.data.find('\n', self.index)+1
27        if line_end == 0:
28            line_end = len(self.data)
29        self._line = (line_start, line_end)
30        row = self.data.count('\n', 0, line_start)
31        col = self.index-line_start
32        self._position = (row, col)
33
34class Error(Exception):
35
36    def __init__(self, message=None, marker=None):
37        Exception.__init__(self)
38        self.message = message
39        if isinstance(marker, list):
40            if marker:
41                marker = marker[0].marker
42            else:
43                marker = None
44        self.marker = marker
45
46    def __str__(self):
47        if self.marker is not None:
48            row, col = self.marker.position()
49            start, end = self.marker.line()
50            error_position = "source \"%s\", line %s, column %s:\n%s\n"  \
51                    % (self.marker.source, row+1, col+1, self.marker.data[start:end].rstrip().encode('utf-8'))
52            error_pointer = " " * col + "^\n"
53        else:
54            error_position = ""
55            error_pointer = ""
56        if self.message is not None:
57            error_message = self.message
58        else:
59            error_message = "YAML error"
60        return error_position+error_pointer+error_message
61
62def scanner_rule(pattern):
63    def make(function):
64        function.pattern = pattern
65        return function
66    return make
67
68class Token:
69
70    def __init__(self, name, value, marker=None):
71        self.name = name
72        self.value = value
73        self.marker = marker
74
75class YAMLScanner:
76
77    @scanner_rule(r"\s+")
78    def WHITESPACE(self, tokens, token):
79        pass
80           
81    @scanner_rule(r"%YAML")
82    def DIRECTIVE_NAME(self, tokens, token):
83        tokens.append(token)
84
85    @scanner_rule(r"\d+\.\d+")
86    def DIRECTIVE_VALUE(self, tokens, token):
87        token.value = float(token.value)
88        tokens.append(token)
89
90    @scanner_rule(r"---")
91    def DOCUMENT_SEPARATOR(self, tokens, token):
92        tokens.append(token)
93
94    @scanner_rule(r"\[")
95    def SEQ_START(self, tokens, token):
96        tokens.append(token)
97
98    @scanner_rule(r"\]")
99    def SEQ_END(self, tokens, token):
100        tokens.append(token)
101
102    @scanner_rule(r"\{")
103    def MAP_START(self, tokens, token):
104        tokens.append(token)
105
106    @scanner_rule(r"\}")
107    def MAP_END(self, tokens, token):
108        tokens.append(token)
109
110    @scanner_rule(r"\?")
111    def MAP_KEY(self, tokens, token):
112        tokens.append(token)
113
114    @scanner_rule(r":")
115    def MAP_VALUE(self, tokens, token):
116        tokens.append(token)
117
118    @scanner_rule(r",")
119    def COLL_ENTRY(self, tokens, token):
120        tokens.append(token)
121
122    @scanner_rule(r"!\S*")
123    def TAG(self, tokens, token):
124        if token.value == "!":
125            token.value = ""
126        elif token.value.startswith(r"!<") and token.value.endswith(r">"):
127            token.value = token.value[2:-1]
128        elif token.value.startswith(r"!!"):
129            token.value = "tag:yaml.org,2002:" + token.value[2:]
130        tokens.append(token)
131
132    escapes_re = re.compile(r"\\(?P<value>[\\\"abefnrtvNLP_0 ]|x[0-9A-Fa-f]{2}|u[0-9A-Fa-f]{4}|U[0-9A-Fa-f]{8})", re.U)
133    escapes = {
134        "\\": u"\\",
135        "\"": u"\"",
136        " ": u" ",
137        "a": u"\x07",
138        "b": u"\x08",
139        "e": u"\x1B",
140        "f": u"\x0C",
141        "n": u"\x0A",
142        "r": u"\x0D",
143        "t": u"\x09",
144        "v": u"\x0B",
145        "N": u"\u0085",
146        "L": u"\u2028",
147        "P": u"\u2029",
148        "_": u"_",
149        "0": u"\x00",
150    }
151
152    def escapes_replace(self, match):
153        value = match.group('value')
154        if len(value) == 1:
155            return self.escapes[value]
156        else:
157            return unichr(int(value[1:], 16))
158
159    @scanner_rule(r"\"(?:[^\"\\]|\\[\\\"abefnrtvNLP_0 ]|\\x[0-9A-Fa-f]{2}|\\u[0-9A-Fa-f]{4}|\\U[0-9A-Fa-f]{8})*\"")
160    def SCALAR(self, tokens, token):
161        token.value = self.escapes_re.sub(self.escapes_replace, token.value[1:-1])
162        tokens.append(token)
163
164    @scanner_rule(r"&\S+")
165    def ANCHOR(self, tokens, token):
166        token.value = token.value[1:]
167        tokens.append(token)
168
169    @scanner_rule(r"\*\S+")
170    def ALIAS(self, tokens, token):
171        token.value = token.value[1:]
172        tokens.append(token)
173
174    def __init__(self):
175        rules = []
176        for name, function in vars(self.__class__).items():
177            if hasattr(function, 'pattern'):
178                rules.append((name, function.pattern))
179        patterns = [r'(?P<%s>%s)' % (name, pattern) for name, pattern in rules]
180        self.scanner_re = re.compile('|'.join(patterns), re.U)
181
182    def scan(self, source, data):
183        data = unicode(data, 'utf-8')
184        tokens = []
185        index = 0
186        while index < len(data):
187            match = self.scanner_re.match(data, index)
188            if not match:
189                raise Error("invalid token", Marker(source, data, index))
190            name = match.lastgroup
191            value = match.group()
192            marker = Marker(source, data, index, len(value))
193            token = Token(name, value, marker)
194            processor = getattr(self, name)
195            processor(tokens, token)
196            index += len(value)
197        return tokens
198
199class Value:
200    def __init__(self, tag, anchor, value):
201        self.tag = tag
202        self.anchor = anchor
203        self.value = value
204    def __eq__(self, other):
205        return (self.__class__, self.__dict__) == (other.__class__, other.__dict__)
206
207class Scalar(Value):
208    pass
209
210class Sequence(Value):
211    pass
212
213class Mapping(Value):
214    pass
215
216class Alias:
217    def __init__(self, link):
218        self.link = link
219    def __eq__(self, other):
220        return (self.__class__, self.__dict__) == (other.__class__, other.__dict__)
221
222class YAMLParser:
223
224    # stream: document*
225    def parse_stream(self, tokens):
226        documents = []
227        while tokens:
228            if self.check_token(tokens, ['DIRECTIVE_NAME', 'DOCUMENT_SEPARATOR']):
229                documents.append(self.parse_document(tokens))
230            else:
231                raise Error("document is expected", tokens)
232        return documents
233
234    # document: (DIRECTIVE_NAME DIRECTIVE_VALUE)? DOCUMENT_SEPARATOR node?
235    def parse_document(self, tokens):
236        node = None
237        if self.check_token(tokens, ['DIRECTIVE_NAME']):
238            self.eat_token(tokens, 'DIRECTIVE_NAME')
239            self.eat_token(tokens, 'DIRECTIVE_VALUE')
240        self.eat_token(tokens, 'DOCUMENT_SEPARATOR')
241        if self.check_token(tokens, ['TAG', 'ANCHOR', 'ALIAS', 'SCALAR', 'SEQ_START', 'MAP_START']):
242            node = self.parse_node(tokens)
243        return node
244
245    # node: TAG? ANCHOR? (SCALAR|sequence|mapping) | ALIAS")
246    def parse_node(self, tokens):
247        if self.check_token(tokens, ['ALIAS']):
248            return Alias(self.eat_token(tokens, 'ALIAS'))
249        else:
250            tag = None
251            anchor = None
252            if self.check_token(tokens, ['TAG']):
253                tag = self.eat_token(tokens, 'TAG')
254            if self.check_token(tokens, ['ANCHOR']):
255                anchor = self.eat_token(tokens, 'ANCHOR')
256            if self.check_token(tokens, ['SCALAR']):
257                return Scalar(tag, anchor, self.eat_token(tokens, 'SCALAR'))
258            elif self.check_token(tokens, ['SEQ_START']):
259                return Sequence(tag, anchor, self.parse_sequence(tokens))
260            elif self.check_token(tokens, ['MAP_START']):
261                return Mapping(tag, anchor, self.parse_mapping(tokens))
262            else:
263                raise Error("SCALAR, sequence or mapping is expected", tokens)
264
265    # sequence: SEQ_START (node (COLL_ENTRY node)*)? SEQ_END
266    def parse_sequence(self, tokens):
267        values = []
268        self.eat_token(tokens, 'SEQ_START')
269        if not self.check_token(tokens, ['SEQ_END']):
270            values.append(self.parse_node(tokens))
271            while not self.check_token(tokens, ['SEQ_END']):
272                self.eat_token(tokens, 'COLL_ENTRY')
273                values.append(self.parse_node(tokens))
274        self.eat_token(tokens, 'SEQ_END')
275        return values
276
277    # mapping: MAP_START (map_entry (COLL_ENTRY map_entry)*)? MAP_END
278    def parse_mapping(self, tokens):
279        values = []
280        self.eat_token(tokens, 'MAP_START')
281        if not self.check_token(tokens, ['MAP_END']):
282            values.append(self.parse_map_entry(tokens))
283            while not self.check_token(tokens, ['MAP_END']):
284                self.eat_token(tokens, 'COLL_ENTRY')
285                values.append(self.parse_map_entry(tokens))
286        self.eat_token(tokens, 'MAP_END')
287        return values
288
289    # map_entry: MAP_KEY node MAP_VALUE node
290    def parse_map_entry(self, tokens):
291        self.eat_token(tokens, 'MAP_KEY')
292        key = self.parse_node(tokens)
293        self.eat_token(tokens, 'MAP_VALUE')
294        value = self.parse_node(tokens)
295        return (key, value)
296
297    def check_token(self, tokens, names):
298        return tokens and tokens[0].name in names
299
300    def eat_token(self, tokens, name):
301        if not tokens:
302            raise Error("%s is expected, EOF is found" % name, tokens)
303        if tokens and tokens[0].name != name:
304            raise Error("%s is expected, %s is found" % (name, tokens[0].name), tokens)
305        return tokens.pop(0).value
306
307    def __init__(self):
308        self.scanner = YAMLScanner()
309
310    def parse(self, source, data):
311        tokens = self.scanner.scan(source, data)
312        return self.parse_stream(tokens)
313
314class Test(unittest.TestCase):
315
316    def testScalar(self):
317        parser = YAMLParser()
318        documents = parser.parse('testScalar', """--- !!str "foo"\n""")
319        self.failUnlessEqual(documents, [Scalar('tag:yaml.org,2002:str', None, "foo")])
320
321    def testSequence(self):
322        parser = YAMLParser()
323        documents = parser.parse('testSequence', """%YAML 1.1\n--- !!seq\n["foo", "bar", "baz"]\n""")
324        self.failUnlessEqual(documents, [
325            Sequence('tag:yaml.org,2002:seq', None, [
326                Scalar(None, None, "foo"),
327                Scalar(None, None, "bar"),
328                Scalar(None, None, "baz"),
329            ])
330        ])
331
332    def testMapping(self):
333        parser = YAMLParser()
334        documents = parser.parse('testMapping', """%YAML 1.1\n--- !!map\n{ ? "foo" : "bar", ? "baz" : "bat" }\n""")
335        self.failUnlessEqual(documents, [
336            Mapping('tag:yaml.org,2002:map', None, [
337                (Scalar(None, None, "foo"), Scalar(None, None, "bar")),
338                (Scalar(None, None, "baz"), Scalar(None, None, "bat")),
339            ])
340        ])
341
342    def testAlias(self):
343        parser = YAMLParser()
344        documents = parser.parse('testSequence', """%YAML 1.1\n--- !!seq\n[ &id "foo", *id ]\n""")
345        self.failUnlessEqual(documents, [
346            Sequence('tag:yaml.org,2002:seq', None, [
347                Scalar(None, 'id', "foo"),
348                Alias('id'),
349            ])
350        ])
351
352    def testMultiplyDocuments(self):
353        parser = YAMLParser()
354        documents = parser.parse('testMultiplyDocuments', """%YAML 1.1\n--- "foo"\n--- "bar"\n--- "baz"\n""")
355        self.failUnlessEqual(documents, [
356            Scalar(None, None, "foo"),
357            Scalar(None, None, "bar"),
358            Scalar(None, None, "baz"),
359        ])
360
361if __name__ == '__main__':
362    unittest.main()
363
Note: See TracBrowser for help on using the repository browser.