# Events should obey the following grammar: # stream ::= STREAM-START document* STREAM-END # document ::= DOCUMENT-START node DOCUMENT-END # node ::= SCALAR | sequence | mapping # sequence ::= SEQUENCE-START node* SEQUENCE-END # mapping ::= MAPPING-START (node node)* MAPPING-END __all__ = ['Emitter', 'EmitterError'] from error import YAMLError from events import * class EmitterError(YAMLError): pass class Emitter: def __init__(self, writer): self.writer = writer self.states = [] self.state = self.expect_stream_start self.indents = [] self.indent = None self.flow_level = 0 self.key_context = False self.space = True self.line = True self.allow_inline_collection = False self.allow_indentless_sequence = False self.simple_key = False self.event_queue = [] def emit(self, event): if self.event_queue: self.event_queue.append(event) event = self.event_queue.pop(0) self.state(event) def push_back(self, event): self.event_queue.insert(0, event) def expect_stream_start(self, event): if isinstance(event, StreamStartEvent): self.state = self.expect_document_start else: raise EmitterError("expected StreamStartEvent, but got %s" % event.__class__.__name__) def expect_document_start(self, event): if isinstance(event, DocumentStartEvent): self.write_document_start() self.states.append(self.expect_document_end) self.state = self.expect_root_node self.allow_inline_collection = False self.allow_indentless_sequence = False elif isinstance(event, StreamEndEvent): self.writer.flush() self.state = self.expect_nothing else: raise EmitterError("expected DocumentStartEvent, but got %s" % event.__class__.__name__) def expect_document_end(self, event): if isinstance(event, DocumentEndEvent): self.write_document_end() self.state = self.expect_document_start self.allow_inline_collection = False self.allow_indentless_sequence = False else: raiseEmitterError("expected DocumentEndEvent, but got %s" % event.__class__.__name__) def expect_root_node(self, event): self.expect_node(event) def expect_node(self, event): empty = None if isinstance(event, (SequenceEvent, MappingEvent)): if not self.event_queue: return self.push_back(event) empty = isinstance(self.event_queue[0], CollectionEndEvent) if isinstance(event, AliasEvent): self.expect_alias(event) elif isinstance(event, (ScalarEvent, SequenceEvent, MappingEvent)): if event.anchor: self.write_anchor("&", event.anchor) self.allow_inline_collection = False if event.tag not in [None, u'!']: self.write_tag(event.tag) self.allow_inline_collection = False if isinstance(event, ScalarEvent): self.expect_scalar(event) elif isinstance(event, SequenceEvent): self.expect_sequence(event, empty) elif isinstance(event, MappingEvent): self.expect_mapping(event, empty) else: raise EmitterError("Expected NodeEvent, but got %s" % event.__class__.__name__) def expect_alias(self, event): self.write_anchor("*", event.anchor) self.state = self.states.pop() def expect_scalar(self, event): self.indents.append(self.indent) if self.indent is None: self.indent = 2 else: self.indent += 2 self.write_scalar(event.value, event.implicit, event.style) self.indent = self.indents.pop() self.state = self.states.pop() self.allow_inline_collection = False self.allow_indentless_sequence = False def expect_sequence(self, event, empty): if self.flow_level or event.flow or empty: self.write_indicator("[", need_space=True, provide_space=True) self.flow_level += 1 self.indents.append(self.indent) if self.indent is None: self.indent = 2 else: self.indent += 2 self.state = self.expect_first_flow_sequence_item else: self.indents.append(self.indent) if self.indent is None: self.indent = 0 else: self.indent += 2 self.state = self.expect_first_block_sequence_item def expect_mapping(self, event, empty): if self.flow_level or event.flow or empty: self.write_indicator("{", need_space=True, provide_space=True) self.flow_level += 1 self.indents.append(self.indent) if self.indent is None: self.indent = 2 else: self.indent += 2 self.state = self.expect_first_flow_mapping_key else: self.indents.append(self.indent) if self.indent is None: self.indent = 0 else: self.indent += 2 self.state = self.expect_first_block_mapping_key def expect_first_flow_sequence_item(self, event): if isinstance(event, CollectionEndEvent): self.indent = self.indents.pop() self.write_indicator("]", provide_space=True) self.flow_level -= 1 self.state = self.states.pop() else: self.write_indent() self.states.append(self.expect_flow_sequence_item) self.state = self.expect_node self.expect_node(event) def expect_flow_sequence_item(self, event): if isinstance(event, CollectionEndEvent): self.write_indicator(",") self.indent = self.indents.pop() self.write_indent() self.write_indicator("]") self.flow_level -= 1 self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() self.states.append(self.expect_flow_sequence_item) self.state = self.expect_node self.expect_node(event) def expect_first_block_sequence_item(self, event): assert not isinstance(event, CollectionEndEvent) if not self.allow_inline_collection: if self.allow_indentless_sequence: self.indent = self.indents.pop() self.indents.append(self.indent) self.write_indent() self.write_indicator("-", need_space=True) self.allow_indentless_sequence = False self.allow_inline_collection = True self.states.append(self.expect_block_sequence_item) self.state = self.expect_node self.expect_node(event) def expect_block_sequence_item(self, event): if isinstance(event, CollectionEndEvent): self.indent = self.indents.pop() self.state = self.states.pop() else: self.write_indent() self.write_indicator("-", need_space=True) self.allow_indentless_sequence = False self.allow_inline_collection = True self.states.append(self.expect_block_sequence_item) self.state = self.expect_node self.expect_node(event) def expect_first_flow_mapping_key(self, event): if isinstance(event, CollectionEndEvent): self.indent = self.indents.pop() self.write_indicator("}") self.flow_level -= 1 self.state = self.states.pop() else: self.write_indent() if self.is_simple(event): self.simple_key = True else: self.write_indicator("?", need_space=True) self.states.append(self.expect_flow_mapping_value) self.state = self.expect_node self.expect_node(event) def expect_flow_mapping_key(self, event): if isinstance(event, CollectionEndEvent): self.indent = self.indents.pop() self.write_indent() self.write_indicator("}") self.flow_level -= 1 self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() if self.is_simple(event): self.simple_key = True else: self.write_indicator("?", need_space=True) self.states.append(self.expect_flow_mapping_value) self.state = self.expect_node self.expect_node(event) def expect_flow_mapping_value(self, event): if self.simple_key: self.write_indicator(":", need_space=False) self.simple_key = False else: self.write_indent() self.write_indicator(":", need_space=True) self.states.append(self.expect_flow_mapping_key) self.state = self.expect_node self.expect_node(event) def expect_first_block_mapping_key(self, event): assert not isinstance(event, CollectionEndEvent) simple = self.is_simple(event) if simple is None: return self.push_back(event) if not self.allow_inline_collection: self.write_indent() if self.is_simple(event): self.allow_indentless_sequence = True self.allow_inline_collection = False self.simple_key = True else: self.write_indicator("?", need_space=True) self.allow_indentless_sequence = True self.allow_inline_collection = True self.states.append(self.expect_block_mapping_value) self.state = self.expect_node self.expect_node(event) def expect_block_mapping_key(self, event): if isinstance(event, CollectionEndEvent): self.indent = self.indents.pop() self.state = self.states.pop() else: self.write_indent() if self.is_simple(event): self.allow_indentless_sequence = True self.allow_inline_collection = False self.simple_key = True else: self.write_indicator("?", need_space=True) self.allow_indentless_sequence = True self.allow_inline_collection = True self.states.append(self.expect_block_mapping_value) self.state = self.expect_node self.expect_node(event) def expect_block_mapping_value(self, event): if self.simple_key: self.write_indicator(":", need_space=False) self.allow_indentless_sequence = True self.allow_inline_collection = False self.simple_key = False else: self.write_indent() self.write_indicator(":", need_space=True) self.allow_indentless_sequence = True self.allow_inline_collection = True self.states.append(self.expect_block_mapping_key) self.state = self.expect_node self.expect_node(event) def expect_nothing(self, event): raise EmitterError("expected nothing, but got %s" % event.__class__.__name__) def write_document_start(self): self.writer.write("%YAML 1.1") self.write_line_break() self.writer.write("---") self.space = False self.line = False def write_document_end(self): if not self.line: self.write_line_break() self.writer.write("...") self.write_line_break() def write_line_break(self): self.writer.write('\n') self.space = True self.line = True def write_anchor(self, indicator, name): if not self.space: self.writer.write(" ") self.writer.write("%s%s" % (indicator, name)) self.space = False self.line = False def write_tag(self, tag): if not self.space: self.writer.write(" ") if tag.startswith("tag:yaml.org,2002:"): self.writer.write("!!%s" % tag[len("tag.yaml.org,2002:"):]) else: self.writer.write("!<%s>" % tag) self.space = False self.line = False def is_simple(self, event): if not isinstance(event, ScalarEvent): return False if '\n' in event.value or len(event.value) > 128: return False if event.style and event.style in '|>': return False return True def write_scalar(self, value, implicit, style): if implicit: if not self.space: self.writer.write(" ") self.writer.write(value.encode('utf-8')) self.space = False self.line = False elif style in ['>', '|'] and not self.flow_level and not self.simple_key: if not self.space: self.writer.write(" ") self.writer.write("|-") self.write_line_break() self.write_indent() self.writer.write(value.encode('utf-8')) self.write_line_break() else: if not self.space: self.writer.write(" ") self.writer.write("\"%s\"" % value.encode('utf-8')) self.space = False self.line = False def write_indicator(self, indicator, need_space=False, provide_space=False): if need_space and not self.space: self.writer.write(" ") self.writer.write(indicator) self.space = provide_space self.line = False def write_indent(self): if not self.line: self.write_line_break() if self.indent: self.writer.write(" "*self.indent) self.line = False self.space = True