# Events should obey the following grammar: # stream ::= STREAM-START document* STREAM-END # document ::= DOCUMENT-START node DOCUMENT-END # node ::= SCALAR | sequence | mapping # sequence ::= SEQUENCE-START node* SEQUENCE-END # mapping ::= MAPPING-START (node node)* MAPPING-END __all__ = ['Emitter', 'EmitterError'] from error import YAMLError from events import * class EmitterError(YAMLError): pass class Emitter: def __init__(self, writer): self.writer = writer self.states = [] self.state = self.expect_stream_start self.levels = [] self.level = 0 self.soft_space = False def emit(self, event): self.state(event) def expect_stream_start(self, event): if isinstance(event, StreamStartEvent): self.state = self.expect_document_start else: raise EmitterError("expected StreamStartEvent, but got %s" % event.__class__.__name__) def expect_document_start(self, event): if isinstance(event, DocumentStartEvent): self.write_document_start() self.states.append(self.expect_document_end) self.state = self.expect_root_node elif isinstance(event, StreamEndEvent): self.writer.flush() self.state = self.expect_nothing else: raise EmitterError("expected DocumentStartEvent, but got %s" % event.__class__.__name__) def expect_document_end(self, event): if isinstance(event, DocumentEndEvent): self.write_document_end() self.state = self.expect_document_start else: raiseEmitterError("expected DocumentEndEvent, but got %s" % event.__class__.__name__) def expect_root_node(self, event): self.expect_node(event) def expect_node(self, event): if isinstance(event, AliasEvent): self.write_anchor("*", event.anchor) self.state = self.states.pop() elif isinstance(event, NodeEvent): if event.anchor: self.write_anchor("&", event.anchor) if event.tag: self.write_tag(event.tag) if isinstance(event, ScalarEvent): self.write_scalar(event.value) self.state = self.states.pop() elif isinstance(event, SequenceEvent): self.write_collection_start("[") self.level += 1 self.state = self.expect_first_sequence_item elif isinstance(event, MappingEvent): self.write_collection_start("{") self.level += 1 self.state = self.expect_first_mapping_key else: raise EmitterError("Expected NodeEvent, but got %s" % event.__class__.__name__) def expect_first_sequence_item(self, event): if isinstance(event, CollectionEndEvent): self.write_collection_end("]") self.state = self.states.pop() else: self.write_indent() self.states.append(self.expect_sequence_item) self.expect_node(event) def expect_sequence_item(self, event): if isinstance(event, CollectionEndEvent): self.level -= 1 self.write_indent() self.write_collection_end("]") self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() self.states.append(self.expect_sequence_item) self.expect_node(event) def expect_first_mapping_key(self, event): if isinstance(event, CollectionEndEvent): self.write_collection_end("}") self.state = self.states.pop() else: self.write_indent() self.write_indicator("?") self.states.append(self.expect_mapping_value) self.expect_node(event) def expect_mapping_key(self, event): if isinstance(event, CollectionEndEvent): self.level -= 1 self.write_indent() self.write_collection_end("}") self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() self.write_indicator("?") self.states.append(self.expect_mapping_value) self.expect_node(event) def expect_mapping_value(self, event): self.write_indent() self.write_indicator(":") self.states.append(self.expect_mapping_key) self.expect_node(event) def expect_nothing(self, event): raise EmitterError("expected nothing, but got %s" % event.__class__.__name__) def write_document_start(self): self.writer.write("%YAML 1.1\n") self.writer.write("---") self.soft_space = True def write_document_end(self): self.writer.write("\n...\n") self.soft_space = False def write_collection_start(self, indicator): if self.soft_space: self.writer.write(" ") self.writer.write(indicator) self.soft_space = False def write_collection_end(self, indicator): self.writer.write(indicator) self.soft_space = True def write_anchor(self, indicator, name): if self.soft_space: self.writer.write(" ") self.writer.write("%s%s" % (indicator, name)) self.soft_space = True def write_tag(self, tag): if self.soft_space: self.writer.write(" ") if tag.startswith("tag:yaml.org,2002:"): self.writer.write("!!%s" % tag[len("tag.yaml.org,2002:"):]) else: self.writer.write("!<%s>" % tag) self.soft_space = True def write_scalar(self, value): if self.soft_space: self.writer.write(" ") self.writer.write("\"%s\"" % value.encode('utf-8')) self.soft_space = True def write_indicator(self, indicator): self.writer.write(indicator) self.soft_space = True def write_indent(self): self.writer.write("\n"+" "*(self.level*4)) self.soft_space = False