Changeset 136 for pyyaml/trunk/tests/test_appliance.py
- Timestamp:
- 04/15/06 19:54:52 (7 years ago)
- File:
-
- 1 edited
-
pyyaml/trunk/tests/test_appliance.py (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
pyyaml/trunk/tests/test_appliance.py
r132 r136 2 2 import unittest, os 3 3 4 from yaml.tokens import * 5 from yaml.events import * 4 from yaml import * 6 5 7 6 class TestAppliance(unittest.TestCase): … … 44 43 self.data = unicode(data, 'utf-8')+u'\0' 45 44 self.index = 0 45 self.scan() 46 47 def check_token(self, *choices): 48 if self.tokens: 49 if not choices: 50 return True 51 for choice in choices: 52 if isinstance(self.tokens[0], choice): 53 return True 54 return False 55 56 def peek_token(self): 57 if self.tokens: 58 return self.tokens[0] 59 60 def get_token(self, choice=None): 61 token = self.tokens.pop(0) 62 if choice and not isinstance(token, choice): 63 raise Error("unexpected token "+repr(token)) 64 return token 65 66 def get_token_value(self): 67 token = self.get_token() 68 return token.value 46 69 47 70 def scan(self): 48 #print self.data[self.index:] 49 tokens = [] 50 tokens.append(StreamStartToken(None, None)) 71 self.tokens = [] 72 self.tokens.append(StreamStartToken(None, None)) 51 73 while True: 52 74 self.find_token() 53 75 ch = self.data[self.index] 54 76 if ch == u'\0': 55 tokens.append(StreamEndToken(None, None))77 self.tokens.append(StreamEndToken(None, None)) 56 78 break 57 79 elif ch == u'%': 58 tokens.append(self.scan_directive())80 self.tokens.append(self.scan_directive()) 59 81 elif ch == u'-' and self.data[self.index:self.index+3] == u'---': 60 82 self.index += 3 61 tokens.append(DocumentStartToken(None, None))83 self.tokens.append(DocumentStartToken(None, None)) 62 84 elif ch == u'[': 63 85 self.index += 1 64 tokens.append(FlowSequenceStartToken(None, None))86 self.tokens.append(FlowSequenceStartToken(None, None)) 65 87 elif ch == u'{': 66 88 self.index += 1 67 tokens.append(FlowMappingStartToken(None, None))89 self.tokens.append(FlowMappingStartToken(None, None)) 68 90 elif ch == u']': 69 91 self.index += 1 70 tokens.append(FlowSequenceEndToken(None, None))92 self.tokens.append(FlowSequenceEndToken(None, None)) 71 93 elif ch == u'}': 72 94 self.index += 1 73 tokens.append(FlowMappingEndToken(None, None))95 self.tokens.append(FlowMappingEndToken(None, None)) 74 96 elif ch == u'?': 75 97 self.index += 1 76 tokens.append(KeyToken(None, None))98 self.tokens.append(KeyToken(None, None)) 77 99 elif ch == u':': 78 100 self.index += 1 79 tokens.append(ValueToken(None, None))101 self.tokens.append(ValueToken(None, None)) 80 102 elif ch == u',': 81 103 self.index += 1 82 tokens.append(FlowEntryToken(None, None))104 self.tokens.append(FlowEntryToken(None, None)) 83 105 elif ch == u'*' or ch == u'&': 84 tokens.append(self.scan_alias())106 self.tokens.append(self.scan_alias()) 85 107 elif ch == u'!': 86 tokens.append(self.scan_tag())108 self.tokens.append(self.scan_tag()) 87 109 elif ch == u'"': 88 tokens.append(self.scan_scalar())110 self.tokens.append(self.scan_scalar()) 89 111 else: 90 112 raise Error("invalid token") 91 return tokens92 113 93 114 DIRECTIVE = u'%YAML 1.1' … … 204 225 class CanonicalParser: 205 226 206 def __init__(self, data): 207 self.scanner = CanonicalScanner(data) 227 def __init__(self): 208 228 self.events = [] 229 self.parse() 209 230 210 231 # stream: STREAM-START document* STREAM-END 211 232 def parse_stream(self): 212 self. consume_token(StreamStartToken)233 self.get_token(StreamStartToken) 213 234 self.events.append(StreamStartEvent(None, None)) 214 while not self. test_token(StreamEndToken):215 if self. test_token(DirectiveToken, DocumentStartToken):235 while not self.check_token(StreamEndToken): 236 if self.check_token(DirectiveToken, DocumentStartToken): 216 237 self.parse_document() 217 238 else: 218 239 raise Error("document is expected, got "+repr(self.tokens[self.index])) 219 self. consume_token(StreamEndToken)240 self.get_token(StreamEndToken) 220 241 self.events.append(StreamEndEvent(None, None)) 221 242 … … 223 244 def parse_document(self): 224 245 node = None 225 if self. test_token(DirectiveToken):226 self. consume_token(DirectiveToken)227 self. consume_token(DocumentStartToken)246 if self.check_token(DirectiveToken): 247 self.get_token(DirectiveToken) 248 self.get_token(DocumentStartToken) 228 249 self.events.append(DocumentStartEvent(None, None)) 229 250 self.parse_node() … … 232 253 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) 233 254 def parse_node(self): 234 if self. test_token(AliasToken):235 self.events.append(AliasEvent(self.get_ value(), None, None))255 if self.check_token(AliasToken): 256 self.events.append(AliasEvent(self.get_token_value(), None, None)) 236 257 else: 237 258 anchor = None 238 if self. test_token(AnchorToken):239 anchor = self.get_ value()259 if self.check_token(AnchorToken): 260 anchor = self.get_token_value() 240 261 tag = None 241 if self. test_token(TagToken):242 tag = self.get_ value()243 if self. test_token(ScalarToken):244 self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))245 elif self. test_token(FlowSequenceStartToken):262 if self.check_token(TagToken): 263 tag = self.get_token_value() 264 if self.check_token(ScalarToken): 265 self.events.append(ScalarEvent(anchor, tag, False, self.get_token_value(), None, None)) 266 elif self.check_token(FlowSequenceStartToken): 246 267 self.events.append(SequenceStartEvent(anchor, tag, None, None)) 247 268 self.parse_sequence() 248 elif self. test_token(FlowMappingStartToken):269 elif self.check_token(FlowMappingStartToken): 249 270 self.events.append(MappingStartEvent(anchor, tag, None, None)) 250 271 self.parse_mapping() … … 254 275 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END 255 276 def parse_sequence(self): 256 self. consume_token(FlowSequenceStartToken)257 if not self. test_token(FlowSequenceEndToken):277 self.get_token(FlowSequenceStartToken) 278 if not self.check_token(FlowSequenceEndToken): 258 279 self.parse_node() 259 while not self. test_token(FlowSequenceEndToken):260 self. consume_token(FlowEntryToken)261 if not self. test_token(FlowSequenceEndToken):280 while not self.check_token(FlowSequenceEndToken): 281 self.get_token(FlowEntryToken) 282 if not self.check_token(FlowSequenceEndToken): 262 283 self.parse_node() 263 self. consume_token(FlowSequenceEndToken)284 self.get_token(FlowSequenceEndToken) 264 285 self.events.append(SequenceEndEvent(None, None)) 265 286 266 287 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END 267 288 def parse_mapping(self): 268 self. consume_token(FlowMappingStartToken)269 if not self. test_token(FlowMappingEndToken):289 self.get_token(FlowMappingStartToken) 290 if not self.check_token(FlowMappingEndToken): 270 291 self.parse_map_entry() 271 while not self. test_token(FlowMappingEndToken):272 self. consume_token(FlowEntryToken)273 if not self. test_token(FlowMappingEndToken):292 while not self.check_token(FlowMappingEndToken): 293 self.get_token(FlowEntryToken) 294 if not self.check_token(FlowMappingEndToken): 274 295 self.parse_map_entry() 275 self. consume_token(FlowMappingEndToken)296 self.get_token(FlowMappingEndToken) 276 297 self.events.append(MappingEndEvent(None, None)) 277 298 278 299 # map_entry: KEY node VALUE node 279 300 def parse_map_entry(self): 280 self. consume_token(KeyToken)301 self.get_token(KeyToken) 281 302 self.parse_node() 282 self. consume_token(ValueToken)303 self.get_token(ValueToken) 283 304 self.parse_node() 284 305 285 def test_token(self, *choices): 286 for choice in choices: 287 if isinstance(self.tokens[self.index], choice): 306 def parse(self): 307 self.parse_stream() 308 309 def get_event(self): 310 return self.events.pop(0) 311 312 def check_event(self, *choices): 313 if self.events: 314 if not choices: 288 315 return True 316 for choice in choices: 317 if isinstance(self.events[0], choice): 318 return True 289 319 return False 290 320 291 def consume_token(self, cls): 292 if not isinstance(self.tokens[self.index], cls): 293 raise Error("unexpected token "+repr(self.tokens[self.index])) 294 self.index += 1 295 296 def get_value(self): 297 value = self.tokens[self.index].value 298 self.index += 1 299 return value 300 301 def parse(self): 302 self.tokens = self.scanner.scan() 303 self.index = 0 304 self.parse_stream() 305 return self.events 306 307 def get(self): 308 return self.events.pop(0) 309 310 def check(self, *choices): 311 for choice in choices: 312 if isinstance(self.events[0], choice): 313 return True 314 return False 315 316 def peek(self): 321 def peek_event(self): 317 322 return self.events[0] 318 323 324 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Detector): 325 326 def __init__(self, stream): 327 if hasattr(stream, 'read'): 328 stream = stream.read() 329 CanonicalScanner.__init__(self, stream) 330 CanonicalParser.__init__(self) 331 Composer.__init__(self) 332 Constructor.__init__(self) 333 Detector.__init__(self) 334 335 def canonical_scan(stream): 336 return scan(stream, Loader=CanonicalLoader) 337 338 def canonical_parse(stream): 339 return parse(stream, Loader=CanonicalLoader) 340 341 def canonical_compose(stream): 342 return compose(stream, Loader=CanonicalLoader) 343 344 def canonical_compose_all(stream): 345 return compose_all(stream, Loader=CanonicalLoader) 346 347 def canonical_load(stream): 348 return load(stream, Loader=CanonicalLoader) 349 350 def canonical_load_all(stream): 351 return load_all(stream, Loader=CanonicalLoader) 352
Note: See TracChangeset
for help on using the changeset viewer.
