source: pyyaml/trunk/lib/yaml/composer.py @ 233

Revision 233, 4.1 KB checked in by xi, 8 years ago (diff)

Fix loading an empty YAML stream.

RevLine 
[53]1
[137]2__all__ = ['Composer', 'ComposerError']
[57]3
[53]4from error import MarkedYAMLError
5from events import *
6from nodes import *
7
8class ComposerError(MarkedYAMLError):
9    pass
10
[222]11class Composer(object):
[53]12
[136]13    def __init__(self):
[142]14        self.anchors = {}
[53]15
[136]16    def check_node(self):
[233]17        # Drop the STREAM-START event.
18        if self.check_event(StreamStartEvent):
19            self.get_event()
20
[53]21        # If there are more documents available?
[136]22        return not self.check_event(StreamEndEvent)
[53]23
[136]24    def get_node(self):
[53]25        # Get the root node of the next document.
[136]26        if not self.check_event(StreamEndEvent):
[53]27            return self.compose_document()
28
29    def compose_document(self):
[118]30        # Drop the DOCUMENT-START event.
[136]31        self.get_event()
[118]32
33        # Compose the root node.
[137]34        node = self.compose_node(None, None)
[118]35
36        # Drop the DOCUMENT-END event.
[136]37        self.get_event()
[118]38
[223]39        self.anchors = {}
[53]40        return node
41
[137]42    def compose_node(self, parent, index):
[136]43        if self.check_event(AliasEvent):
44            event = self.get_event()
[53]45            anchor = event.anchor
[142]46            if anchor not in self.anchors:
[53]47                raise ComposerError(None, None, "found undefined alias %r"
[116]48                        % anchor.encode('utf-8'), event.start_mark)
[142]49            return self.anchors[anchor]
[136]50        event = self.peek_event()
[53]51        anchor = event.anchor
52        if anchor is not None:
[142]53            if anchor in self.anchors:
[53]54                raise ComposerError("found duplicate anchor %r; first occurence"
[142]55                        % anchor.encode('utf-8'), self.anchors[anchor].start_mark,
[116]56                        "second occurence", event.start_mark)
[137]57        self.descend_resolver(parent, index)
[136]58        if self.check_event(ScalarEvent):
[142]59            node = self.compose_scalar_node(anchor)
[136]60        elif self.check_event(SequenceStartEvent):
[142]61            node = self.compose_sequence_node(anchor)
[136]62        elif self.check_event(MappingStartEvent):
[142]63            node = self.compose_mapping_node(anchor)
[137]64        self.ascend_resolver()
[53]65        return node
66
[142]67    def compose_scalar_node(self, anchor):
[136]68        event = self.get_event()
[137]69        tag = event.tag
70        if tag is None or tag == u'!':
71            tag = self.resolve(ScalarNode, event.value, event.implicit)
[142]72        node = ScalarNode(tag, event.value,
[133]73                event.start_mark, event.end_mark, style=event.style)
[142]74        if anchor is not None:
75            self.anchors[anchor] = node
76        return node
[53]77
[142]78    def compose_sequence_node(self, anchor):
[136]79        start_event = self.get_event()
[137]80        tag = start_event.tag
81        if tag is None or tag == u'!':
82            tag = self.resolve(SequenceNode, None, start_event.implicit)
[136]83        node = SequenceNode(tag, [],
84                start_event.start_mark, None,
[133]85                flow_style=start_event.flow_style)
[142]86        if anchor is not None:
87            self.anchors[anchor] = node
[136]88        index = 0
89        while not self.check_event(SequenceEndEvent):
[137]90            node.value.append(self.compose_node(node, index))
[136]91            index += 1
92        end_event = self.get_event()
93        node.end_mark = end_event.end_mark
94        return node
[53]95
[142]96    def compose_mapping_node(self, anchor):
[136]97        start_event = self.get_event()
[137]98        tag = start_event.tag
99        if tag is None or tag == u'!':
100            tag = self.resolve(MappingNode, None, start_event.implicit)
[222]101        node = MappingNode(tag, [],
[136]102                start_event.start_mark, None,
103                flow_style=start_event.flow_style)
[142]104        if anchor is not None:
105            self.anchors[anchor] = node
[136]106        while not self.check_event(MappingEndEvent):
[222]107            #key_event = self.peek_event()
[137]108            item_key = self.compose_node(node, None)
[222]109            #if item_key in node.value:
110            #    raise ComposerError("while composing a mapping", start_event.start_mark,
111            #            "found duplicate key", key_event.start_mark)
[137]112            item_value = self.compose_node(node, item_key)
[222]113            #node.value[item_key] = item_value
114            node.value.append((item_key, item_value))
[136]115        end_event = self.get_event()
116        node.end_mark = end_event.end_mark
117        return node
[53]118
Note: See TracBrowser for help on using the repository browser.