import yaml
def get_version_string():
return yaml_get_version_string()
def get_version():
cdef int major, minor, patch
yaml_get_version(&major, &minor, &patch)
return (major, minor, patch)
cdef class Scanner:
cdef yaml_parser_t *parser
cdef int eof
cdef object stream
def __init__(self, stream):
cdef char *input
cdef int size
if hasattr(stream, 'read'):
stream = stream.read()
if PyUnicode_CheckExact(stream) != 0:
stream = stream.encode('utf-8')
if PyString_CheckExact(stream) == 0:
raise TypeError("a string or stream input is required")
self.parser = yaml_parser_new()
if self.parser == NULL:
raise MemoryError
yaml_parser_set_input_string(self.parser, PyString_AS_STRING(stream), PyString_GET_SIZE(stream))
self.eof = 0
self.stream = stream
def __dealloc__(self):
if self.parser != NULL:
yaml_parser_delete(self.parser)
self.parser = NULL
cdef object _convert(self, yaml_token_t *token):
if token == NULL:
if self.parser.error == YAML_MEMORY_ERROR:
raise MemoryError
elif self.parser.error == YAML_READER_ERROR:
raise yaml.reader.ReaderError("",
self.parser.problem_offset,
self.parser.problem_value,
'?', self.parser.problem)
elif self.parser.error == YAML_SCANNER_ERROR:
if self.parser.context != NULL:
raise yaml.scanner.ScannerError(
self.parser.context,
yaml.Mark("",
self.parser.context_mark.index,
self.parser.context_mark.line,
self.parser.context_mark.column,
None, None),
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise yaml.scanner.ScannerError(None, None,
self.parser.problem,
yaml.Mark("",
self.parser.problem_mark.index,
self.parser.problem_mark.line,
self.parser.problem_mark.column,
None, None))
else:
raise RuntimeError("neither error nor token produced")
start_mark = yaml.Mark("",
token.start_mark.index,
token.start_mark.line,
token.start_mark.column,
None, None)
end_mark = yaml.Mark("",
token.end_mark.index,
token.end_mark.line,
token.end_mark.column,
None, None)
if token.type == YAML_STREAM_START_TOKEN:
return yaml.StreamStartToken(start_mark, end_mark)
elif token.type == YAML_STREAM_END_TOKEN:
return yaml.StreamEndToken(start_mark, end_mark)
elif token.type == YAML_VERSION_DIRECTIVE_TOKEN:
return yaml.DirectiveToken('YAML',
(token.data.version_directive.major,
token.data.version_directive.minor),
start_mark, end_mark)
elif token.type == YAML_TAG_DIRECTIVE_TOKEN:
return yaml.DirectiveToken('TAG',
(token.data.tag_directive.handle,
token.data.tag_directive.prefix),
start_mark, end_mark)
elif token.type == YAML_DOCUMENT_START_TOKEN:
return yaml.DocumentStartToken(start_mark, end_mark)
elif token.type == YAML_DOCUMENT_END_TOKEN:
return yaml.DocumentEndToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_SEQUENCE_START_TOKEN:
return yaml.BlockSequenceStartToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_MAPPING_START_TOKEN:
return yaml.BlockMappingStartToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_END_TOKEN:
return yaml.BlockEndToken(start_mark, end_mark)
elif token.type == YAML_FLOW_SEQUENCE_START_TOKEN:
return yaml.FlowSequenceStartToken(start_mark, end_mark)
elif token.type == YAML_FLOW_SEQUENCE_END_TOKEN:
return yaml.FlowSequenceEndToken(start_mark, end_mark)
elif token.type == YAML_FLOW_MAPPING_START_TOKEN:
return yaml.FlowMappingStartToken(start_mark, end_mark)
elif token.type == YAML_FLOW_MAPPING_END_TOKEN:
return yaml.FlowMappingEndToken(start_mark, end_mark)
elif token.type == YAML_BLOCK_ENTRY_TOKEN:
return yaml.BlockEntryToken(start_mark, end_mark)
elif token.type == YAML_FLOW_ENTRY_TOKEN:
return yaml.FlowEntryToken(start_mark, end_mark)
elif token.type == YAML_KEY_TOKEN:
return yaml.KeyToken(start_mark, end_mark)
elif token.type == YAML_VALUE_TOKEN:
return yaml.ValueToken(start_mark, end_mark)
elif token.type == YAML_ALIAS_TOKEN:
return yaml.AliasToken(token.data.anchor,
start_mark, end_mark)
elif token.type == YAML_ANCHOR_TOKEN:
return yaml.AnchorToken(token.data.anchor,
start_mark, end_mark)
elif token.type == YAML_TAG_TOKEN:
handle = token.data.tag.handle
if handle == '':
handle = None
return yaml.TagToken((handle, token.data.tag.suffix),
start_mark, end_mark)
elif token.type == YAML_SCALAR_TOKEN:
value = PyString_FromStringAndSize(token.data.scalar.value, token.data.scalar.length)
return yaml.ScalarToken(unicode(value, 'utf-8'),
bool(token.data.scalar.style == YAML_PLAIN_SCALAR_STYLE),
start_mark, end_mark)
else:
raise RuntimeError("unknown token type")
def get_token(self):
cdef yaml_token_t *token
if self.eof != 0:
return None
token = yaml_parser_get_token(self.parser)
obj = self._convert(token)
if token.type == YAML_STREAM_END_TOKEN:
self.eof = 1
yaml_token_delete(token)
return obj
def peek_token(self):
cdef yaml_token_t *token
if self.eof != 0:
return None
token = yaml_parser_peek_token(self.parser)
return self._convert(token)
def check_token(self, *choices):
cdef yaml_token_t *token
if self.eof != 0:
return False
token = yaml_parser_peek_token(self.parser)
obj = self._convert(token)
if not choices:
return True
for choice in choices:
if isinstance(obj, choice):
return True
return False
class Loader(Scanner,
yaml.parser.Parser,
yaml.composer.Composer,
yaml.constructor.Constructor,
yaml.resolver.Resolver):
def __init__(self, stream):
Scanner.__init__(self, stream)
yaml.parser.Parser.__init__(self)
yaml.composer.Composer.__init__(self)
yaml.constructor.Constructor.__init__(self)
yaml.resolver.Resolver.__init__(self)
yaml.ExtLoader = Loader