import yaml
def get_version_string():
return yaml_get_version_string()
def get_version():
cdef int major, minor, patch
yaml_get_version(&major, &minor, &patch)
return (major, minor, patch)
def test_scanner(data):
cdef yaml_parser_t *parser
cdef yaml_token_t *token
cdef int done
if PyString_CheckExact(data) == 0:
raise TypeError("string input required")
parser = yaml_parser_new()
if parser == NULL:
raise MemoryError
yaml_parser_set_input_string(parser, PyString_AS_STRING(data), PyString_GET_SIZE(data))
done = 0
while done == 0:
token = yaml_parser_get_token(parser)
if token == NULL:
raise MemoryError
if token.type == YAML_STREAM_END_TOKEN:
done = 1
yaml_token_delete(token)
yaml_parser_delete(parser)
def test_parser(data):
cdef yaml_parser_t *parser
cdef yaml_event_t *event
cdef int done
if PyString_CheckExact(data) == 0:
raise TypeError("string input required")
parser = yaml_parser_new()
if parser == NULL:
raise MemoryError
yaml_parser_set_input_string(parser, PyString_AS_STRING(data), PyString_GET_SIZE(data))
done = 0
while done == 0:
event = yaml_parser_get_event(parser)
if event == NULL:
raise MemoryError
if event.type == YAML_STREAM_END_EVENT:
done = 1
yaml_event_delete(event)
yaml_parser_delete(parser)
cdef class ScannerAndParser:
cdef yaml_parser_t *parser
cdef int eof
cdef object stream
cdef yaml_token_t *cached_token
cdef yaml_event_t *cached_event
cdef object cached_obj
def __init__(self, stream):
if hasattr(stream, 'read'):
stream = stream.read()
if PyUnicode_CheckExact(stream) != 0:
stream = stream.encode('utf-8')
if PyString_CheckExact(stream) == 0:
raise TypeError("a string or stream input is required")
self.parser = yaml_parser_new()
if self.parser == NULL:
raise MemoryError
yaml_parser_set_input_string(self.parser, PyString_AS_STRING(stream), PyString_GET_SIZE(stream))
self.eof = 0
self.stream = stream
self.cached_token = NULL
self.cached_obj = None
def __dealloc__(self):
if self.parser != NULL:
yaml_parser_delete(self.parser)
self.parser = NULL
cdef object _convert_token(self, yaml_token_t *token):
if token == NULL:
if self.parser.error == YAML_MEMORY_ERROR:
raise MemoryError
elif self.parser.error == YAML_READER_ERROR:
raise yaml.reader.ReaderError("",
self.parser.problem_offset,
self.parser.problem_value,
'?', self.parser.problem)
elif self.parser.error == YAML_SCANNER_ERROR:
if self.parser.context != NULL:
raise yaml.scanner.ScannerError(
self.parser.context,
yaml.Mark("",
self.parser.context_mark.index,
self.parser.context_mark.line,
self.parser.context_mark.column,
None, None),
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise yaml.scanner.ScannerError(None, None,
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise RuntimeError("neither error nor token produced")
start_mark = yaml.Mark("",
token.start_mark.index,
token.start_mark.line,
token.start_mark.column,
None, None)
end_mark = yaml.Mark("",
token.end_mark.index,
token.end_mark.line,
token.end_mark.column,
None, None)
if token.type == YAML_STREAM_START_TOKEN:
return yaml.StreamStartToken(start_mark, end_mark)
elif token.type == YAML_STREAM_END_TOKEN:
return yaml.StreamEndToken(start_mark, end_mark)
elif token.type == YAML_VERSION_DIRECTIVE_TOKEN:
return yaml.DirectiveToken('YAML',
(token.data.version_directive.major,
token.data.version_directive.minor),
start_mark, end_mark)
elif token.type == YAML_TAG_DIRECTIVE_TOKEN:
return yaml.DirectiveToken('TAG',
(token.data.tag_directive.handle,
token.data.tag_directive.prefix),
start_mark, end_mark)
elif token.type == YAML_DOCUMENT_START_TOKEN:
return yaml.DocumentStartToken(start_mark, end_mark)
elif token.type == YAML_DOCUMENT_END_TOKEN:
return yaml.DocumentEndToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_SEQUENCE_START_TOKEN:
return yaml.BlockSequenceStartToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_MAPPING_START_TOKEN:
return yaml.BlockMappingStartToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_END_TOKEN:
return yaml.BlockEndToken(start_mark, end_mark)
elif token.type == YAML_FLOW_SEQUENCE_START_TOKEN:
return yaml.FlowSequenceStartToken(start_mark, end_mark)
elif token.type == YAML_FLOW_SEQUENCE_END_TOKEN:
return yaml.FlowSequenceEndToken(start_mark, end_mark)
elif token.type == YAML_FLOW_MAPPING_START_TOKEN:
return yaml.FlowMappingStartToken(start_mark, end_mark)
elif token.type == YAML_FLOW_MAPPING_END_TOKEN:
return yaml.FlowMappingEndToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_ENTRY_TOKEN:
return yaml.BlockEntryToken(start_mark, end_mark)
elif token.type == YAML_FLOW_ENTRY_TOKEN:
return yaml.FlowEntryToken(start_mark, end_mark)
elif token.type == YAML_KEY_TOKEN:
return yaml.KeyToken(start_mark, end_mark)
elif token.type == YAML_VALUE_TOKEN:
return yaml.ValueToken(start_mark, end_mark)
elif token.type == YAML_ALIAS_TOKEN:
return yaml.AliasToken(token.data.alias.value,
start_mark, end_mark)
elif token.type == YAML_ANCHOR_TOKEN:
return yaml.AnchorToken(token.data.anchor.value,
start_mark, end_mark)
elif token.type == YAML_TAG_TOKEN:
handle = token.data.tag.handle
if handle == '':
handle = None
return yaml.TagToken((handle, token.data.tag.suffix),
start_mark, end_mark)
elif token.type == YAML_SCALAR_TOKEN:
value = PyString_FromStringAndSize(token.data.scalar.value, token.data.scalar.length)
return yaml.ScalarToken(unicode(value, 'utf-8'),
bool(token.data.scalar.style == YAML_PLAIN_SCALAR_STYLE),
start_mark, end_mark)
else:
raise RuntimeError("unknown token type")
cdef object _convert_event(self, yaml_event_t *event):
if event == NULL:
if self.parser.error == YAML_MEMORY_ERROR:
raise MemoryError
elif self.parser.error == YAML_READER_ERROR:
raise yaml.reader.ReaderError("",
self.parser.problem_offset,
self.parser.problem_value,
'?', self.parser.problem)
elif self.parser.error == YAML_SCANNER_ERROR:
if self.parser.context != NULL:
raise yaml.scanner.ScannerError(
self.parser.context,
yaml.Mark("",
self.parser.context_mark.index,
self.parser.context_mark.line,
self.parser.context_mark.column,
None, None),
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise yaml.scanner.ScannerError(None, None,
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
elif self.parser.error == YAML_PARSER_ERROR:
if self.parser.context != NULL:
raise yaml.parser.ParserError(
self.parser.context,
yaml.Mark("",
self.parser.context_mark.index,
self.parser.context_mark.line,
self.parser.context_mark.column,
None, None),
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise yaml.parser.ParserError(None, None,
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise RuntimeError("neither error nor event produced")
start_mark = yaml.Mark("",
event.start_mark.index,
event.start_mark.line,
event.start_mark.column,
None, None)
end_mark = yaml.Mark("",
event.end_mark.index,
event.end_mark.line,
event.end_mark.column,
None, None)
if event.type == YAML_STREAM_START_EVENT:
return yaml.StreamStartEvent(start_mark, end_mark)
elif event.type == YAML_STREAM_END_EVENT:
return yaml.StreamEndEvent(start_mark, end_mark)
elif event.type == YAML_DOCUMENT_START_EVENT:
return yaml.DocumentStartEvent(start_mark, end_mark,
(event.data.document_start.implicit == 0))
elif event.type == YAML_DOCUMENT_END_EVENT:
return yaml.DocumentEndEvent(start_mark, end_mark,
(event.data.document_end.implicit == 0))
elif event.type == YAML_SCALAR_EVENT:
if event.data.scalar.anchor == NULL:
anchor = None
else:
anchor = event.data.scalar.anchor
if event.data.scalar.tag == NULL:
tag = None
else:
tag = event.data.scalar.tag
implicit = (event.data.scalar.plain_implicit == 1, event.data.scalar.quoted_implicit == 1)
flow_style = (event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE)
value = PyString_FromStringAndSize(event.data.scalar.value, event.data.scalar.length)
return yaml.ScalarEvent(anchor, tag, implicit, unicode(value, 'utf-8'),
start_mark, end_mark)
elif event.type == YAML_ALIAS_EVENT:
if event.data.alias.anchor == NULL:
anchor = None
else:
anchor = event.data.alias.anchor
return yaml.AliasEvent(anchor, start_mark, end_mark)
elif event.type == YAML_SEQUENCE_START_EVENT:
if event.data.sequence_start.anchor == NULL:
anchor = None
else:
anchor = event.data.sequence_start.anchor
if event.data.sequence_start.tag == NULL:
tag = None
else:
tag = event.data.sequence_start.tag
implicit = (event.data.sequence_start.implicit == 1)
flow_style = (event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE)
return yaml.SequenceStartEvent(anchor, tag, implicit,
start_mark, end_mark, flow_style)
elif event.type == YAML_MAPPING_START_EVENT:
if event.data.mapping_start.anchor == NULL:
anchor = None
else:
anchor = event.data.mapping_start.anchor
if event.data.mapping_start.tag == NULL:
tag = None
else:
tag = event.data.mapping_start.tag
implicit = (event.data.mapping_start.implicit == 1)
flow_style = (event.data.mapping_start.style == YAML_FLOW_MAPPING_STYLE)
return yaml.MappingStartEvent(anchor, tag, implicit,
start_mark, end_mark, flow_style)
elif event.type == YAML_SEQUENCE_END_EVENT:
return yaml.SequenceEndEvent(start_mark, end_mark)
elif event.type == YAML_MAPPING_END_EVENT:
return yaml.MappingEndEvent(start_mark, end_mark)
else:
raise RuntimeError("unknown event type")
def get_token(self):
cdef yaml_token_t *token
if self.cached_token != NULL:
yaml_token_delete(yaml_parser_get_token(self.parser))
obj = self.cached_obj
self.cached_token = NULL
self.cached_obj = None
return obj
if self.eof != 0:
return None
token = yaml_parser_get_token(self.parser)
obj = self._convert_token(token)
if token.type == YAML_STREAM_END_TOKEN:
self.eof = 1
yaml_token_delete(token)
return obj
def peek_token(self):
cdef yaml_token_t *token
if self.cached_token != NULL:
return self.cached_obj
if self.eof != 0:
return None
token = yaml_parser_peek_token(self.parser)
obj = self._convert_token(token)
if token.type == YAML_STREAM_END_TOKEN:
self.eof = 1
self.cached_token = token
self.cached_obj = obj
return obj
def check_token(self, *choices):
cdef yaml_token_t *token
if self.cached_token != NULL:
obj = self.cached_obj
elif self.eof != 0:
return False
else:
token = yaml_parser_peek_token(self.parser)
obj = self._convert_token(token)
if token.type == YAML_STREAM_END_TOKEN:
self.eof = 1
self.cached_token = token
self.cached_obj = obj
if not choices:
return True
for choice in choices:
if isinstance(obj, choice):
return True
return False
def get_event(self):
cdef yaml_event_t *event
if self.cached_event != NULL:
yaml_event_delete(yaml_parser_get_event(self.parser))
obj = self.cached_obj
self.cached_event = NULL
self.cached_obj = None
return obj
if self.eof != 0:
return None
event = yaml_parser_get_event(self.parser)
obj = self._convert_event(event)
if event.type == YAML_STREAM_END_EVENT:
self.eof = 1
yaml_event_delete(event)
return obj
def peek_event(self):
cdef yaml_event_t *event
if self.cached_event != NULL:
return self.cached_obj
if self.eof != 0:
return None
event = yaml_parser_peek_event(self.parser)
obj = self._convert_event(event)
if event.type == YAML_STREAM_END_EVENT:
self.eof = 1
self.cached_event = event
self.cached_obj = obj
return obj
def check_event(self, *choices):
cdef yaml_event_t *event
if self.cached_event != NULL:
obj = self.cached_obj
elif self.eof != 0:
return False
else:
event = yaml_parser_peek_event(self.parser)
obj = self._convert_event(event)
if event.type == YAML_STREAM_END_EVENT:
self.eof = 1
self.cached_event = event
self.cached_obj = obj
if not choices:
return True
for choice in choices:
if isinstance(obj, choice):
return True
return False
class Loader(ScannerAndParser,
yaml.composer.Composer,
yaml.constructor.Constructor,
yaml.resolver.Resolver):
def __init__(self, stream):
ScannerAndParser.__init__(self, stream)
yaml.composer.Composer.__init__(self)
yaml.constructor.Constructor.__init__(self)
yaml.resolver.Resolver.__init__(self)
yaml.ExtLoader = Loader