Changeset 322 for pyyaml/trunk/tests/test_appliance.py
- Timestamp:
- 12/28/08 15:16:50 (4 years ago)
- File:
-
- 1 edited
-
pyyaml/trunk/tests/test_appliance.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
pyyaml/trunk/tests/test_appliance.py
r312 r322 1 1 2 import unittest, os2 import sys, os, os.path, types, traceback, pprint 3 3 4 from yaml import * 5 from yaml.composer import * 6 from yaml.constructor import * 7 from yaml.resolver import * 4 DATA = 'tests/data' 8 5 9 class TestAppliance(unittest.TestCase): 6 def find_test_functions(collections): 7 if not isinstance(collections, list): 8 collections = [collections] 9 functions = [] 10 for collection in collections: 11 if not isinstance(collection, dict): 12 collection = vars(collection) 13 keys = collection.keys() 14 keys.sort() 15 for key in keys: 16 value = collection[key] 17 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 18 functions.append(value) 19 return functions 10 20 11 DATA = 'tests/data' 12 SKIP_EXT = '.skip' 21 def find_test_filenames(directory): 22 filenames = {} 23 for filename in os.listdir(directory): 24 if os.path.isfile(os.path.join(directory, filename)): 25 base, ext = os.path.splitext(filename) 26 filenames.setdefault(base, []).append(ext) 27 filenames = filenames.items() 28 filenames.sort() 29 return filenames 13 30 14 all_tests = {} 15 for filename in os.listdir(DATA): 16 if os.path.isfile(os.path.join(DATA, filename)): 17 root, ext = os.path.splitext(filename) 18 all_tests.setdefault(root, []).append(ext) 31 def parse_arguments(args): 32 if args is None: 33 args = sys.argv[1:] 34 verbose = False 35 if '-v' in args: 36 verbose = True 37 args.remove('-v') 38 if '--verbose' in args: 39 verbose = True 40 if 'YAML_TEST_VERBOSE' in os.environ: 41 verbose = True 42 include_functions = [] 43 if args: 44 include_functions.append(args.pop(0)) 45 if 'YAML_TEST_FUNCTIONS' in os.environ: 46 include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) 47 include_filenames = [] 48 include_filenames.extend(args) 49 if 'YAML_TEST_FILENAMES' in os.environ: 50 include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) 51 return include_functions, include_filenames, verbose 19 52 20 def add_tests(cls, method_name, *extensions): 21 for test in cls.all_tests: 22 available_extensions = cls.all_tests[test] 23 if cls.SKIP_EXT in available_extensions: 24 continue 25 for ext in extensions: 26 if ext not in available_extensions: 27 break 28 else: 29 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions] 30 def test_method(self, test=test, filenames=filenames): 31 getattr(self, '_'+method_name)(test, *filenames) 32 test = test.replace('-', '_').replace('.', '_') 33 try: 34 test_method.__name__ = '%s_%s' % (method_name, test) 35 except TypeError: 36 import new 37 test_method = new.function(test_method.func_code, test_method.func_globals, 38 '%s_%s' % (method_name, test), test_method.func_defaults, 39 test_method.func_closure) 40 setattr(cls, test_method.__name__, test_method) 41 add_tests = classmethod(add_tests) 53 def execute(function, filenames, verbose): 54 if verbose: 55 sys.stdout.write('='*75+'\n') 56 sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames))) 57 try: 58 function(verbose=verbose, *filenames) 59 except Exception, exc: 60 info = sys.exc_info() 61 if isinstance(exc, AssertionError): 62 kind = 'FAILURE' 63 else: 64 kind = 'ERROR' 65 if verbose: 66 traceback.print_exc(limit=1, file=sys.stdout) 67 else: 68 sys.stdout.write(kind[0]) 69 sys.stdout.flush() 70 else: 71 kind = 'SUCCESS' 72 info = None 73 if not verbose: 74 sys.stdout.write('.') 75 sys.stdout.flush() 76 return (function, filenames, kind, info) 42 77 43 class Error(Exception): 44 pass 78 def display(results, verbose): 79 if results and not verbose: 80 sys.stdout.write('\n') 81 total = len(results) 82 failures = 0 83 errors = 0 84 for function, filenames, kind, info in results: 85 if kind == 'SUCCESS': 86 continue 87 if kind == 'FAILURE': 88 failures += 1 89 if kind == 'ERROR': 90 errors += 1 91 sys.stdout.write('='*75+'\n') 92 sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind)) 93 if kind == 'ERROR': 94 traceback.print_exception(file=sys.stdout, *info) 95 else: 96 sys.stdout.write('Traceback (most recent call last):\n') 97 traceback.print_tb(info[2], file=sys.stdout) 98 sys.stdout.write('%s: see below\n' % info[0].__name__) 99 sys.stdout.write('~'*75+'\n') 100 for arg in info[1].args: 101 pprint.pprint(arg, stream=sys.stdout, indent=2) 102 for filename in filenames: 103 sys.stdout.write('-'*75+'\n') 104 sys.stdout.write('%s:\n' % filename) 105 data = open(filename, 'rb').read() 106 sys.stdout.write(data) 107 if data and data[-1] != '\n': 108 sys.stdout.write('\n') 109 sys.stdout.write('='*75+'\n') 110 sys.stdout.write('TESTS: %s\n' % total) 111 if failures: 112 sys.stdout.write('FAILURES: %s\n' % failures) 113 if errors: 114 sys.stdout.write('ERRORS: %s\n' % errors) 45 115 46 class CanonicalScanner: 116 def run(collections, args=None): 117 test_functions = find_test_functions(collections) 118 test_filenames = find_test_filenames(DATA) 119 include_functions, include_filenames, verbose = parse_arguments(args) 120 results = [] 121 for function in test_functions: 122 if include_functions and function.func_name not in include_functions: 123 continue 124 if function.unittest: 125 for base, exts in test_filenames: 126 if include_filenames and base not in include_filenames: 127 continue 128 filenames = [] 129 for ext in function.unittest: 130 if ext not in exts: 131 break 132 filenames.append(os.path.join(DATA, base+ext)) 133 else: 134 skip_exts = getattr(function, 'skip', []) 135 for skip_ext in skip_exts: 136 if skip_ext in exts: 137 break 138 else: 139 result = execute(function, filenames, verbose) 140 results.append(result) 141 else: 142 result = execute(function, [], verbose) 143 results.append(result) 144 display(results, verbose=verbose) 47 145 48 def __init__(self, data):49 self.data = unicode(data, 'utf-8')+u'\0'50 self.index = 051 self.scan()52 53 def check_token(self, *choices):54 if self.tokens:55 if not choices:56 return True57 for choice in choices:58 if isinstance(self.tokens[0], choice):59 return True60 return False61 62 def peek_token(self):63 if self.tokens:64 return self.tokens[0]65 66 def get_token(self, choice=None):67 token = self.tokens.pop(0)68 if choice and not isinstance(token, choice):69 raise Error("unexpected token "+repr(token))70 return token71 72 def get_token_value(self):73 token = self.get_token()74 return token.value75 76 def scan(self):77 self.tokens = []78 self.tokens.append(StreamStartToken(None, None))79 while True:80 self.find_token()81 ch = self.data[self.index]82 if ch == u'\0':83 self.tokens.append(StreamEndToken(None, None))84 break85 elif ch == u'%':86 self.tokens.append(self.scan_directive())87 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':88 self.index += 389 self.tokens.append(DocumentStartToken(None, None))90 elif ch == u'[':91 self.index += 192 self.tokens.append(FlowSequenceStartToken(None, None))93 elif ch == u'{':94 self.index += 195 self.tokens.append(FlowMappingStartToken(None, None))96 elif ch == u']':97 self.index += 198 self.tokens.append(FlowSequenceEndToken(None, None))99 elif ch == u'}':100 self.index += 1101 self.tokens.append(FlowMappingEndToken(None, None))102 elif ch == u'?':103 self.index += 1104 self.tokens.append(KeyToken(None, None))105 elif ch == u':':106 self.index += 1107 self.tokens.append(ValueToken(None, None))108 elif ch == u',':109 self.index += 1110 self.tokens.append(FlowEntryToken(None, None))111 elif ch == u'*' or ch == u'&':112 self.tokens.append(self.scan_alias())113 elif ch == u'!':114 self.tokens.append(self.scan_tag())115 elif ch == u'"':116 self.tokens.append(self.scan_scalar())117 else:118 raise Error("invalid token")119 120 DIRECTIVE = u'%YAML 1.1'121 122 def scan_directive(self):123 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \124 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':125 self.index += len(self.DIRECTIVE)126 return DirectiveToken('YAML', (1, 1), None, None)127 128 def scan_alias(self):129 if self.data[self.index] == u'*':130 TokenClass = AliasToken131 else:132 TokenClass = AnchorToken133 self.index += 1134 start = self.index135 while self.data[self.index] not in u', \n\0':136 self.index += 1137 value = self.data[start:self.index]138 return TokenClass(value, None, None)139 140 def scan_tag(self):141 self.index += 1142 start = self.index143 while self.data[self.index] not in u' \n\0':144 self.index += 1145 value = self.data[start:self.index]146 if value[0] == u'!':147 value = 'tag:yaml.org,2002:'+value[1:]148 elif value[0] == u'<' and value[-1] == u'>':149 value = value[1:-1]150 else:151 value = u'!'+value152 return TagToken(value, None, None)153 154 QUOTE_CODES = {155 'x': 2,156 'u': 4,157 'U': 8,158 }159 160 QUOTE_REPLACES = {161 u'\\': u'\\',162 u'\"': u'\"',163 u' ': u' ',164 u'a': u'\x07',165 u'b': u'\x08',166 u'e': u'\x1B',167 u'f': u'\x0C',168 u'n': u'\x0A',169 u'r': u'\x0D',170 u't': u'\x09',171 u'v': u'\x0B',172 u'N': u'\u0085',173 u'L': u'\u2028',174 u'P': u'\u2029',175 u'_': u'_',176 u'0': u'\x00',177 178 }179 180 def scan_scalar(self):181 self.index += 1182 chunks = []183 start = self.index184 ignore_spaces = False185 while self.data[self.index] != u'"':186 if self.data[self.index] == u'\\':187 ignore_spaces = False188 chunks.append(self.data[start:self.index])189 self.index += 1190 ch = self.data[self.index]191 self.index += 1192 if ch == u'\n':193 ignore_spaces = True194 elif ch in self.QUOTE_CODES:195 length = self.QUOTE_CODES[ch]196 code = int(self.data[self.index:self.index+length], 16)197 chunks.append(unichr(code))198 self.index += length199 else:200 chunks.append(self.QUOTE_REPLACES[ch])201 start = self.index202 elif self.data[self.index] == u'\n':203 chunks.append(self.data[start:self.index])204 chunks.append(u' ')205 self.index += 1206 start = self.index207 ignore_spaces = True208 elif ignore_spaces and self.data[self.index] == u' ':209 self.index += 1210 start = self.index211 else:212 ignore_spaces = False213 self.index += 1214 chunks.append(self.data[start:self.index])215 self.index += 1216 return ScalarToken(u''.join(chunks), False, None, None)217 218 def find_token(self):219 found = False220 while not found:221 while self.data[self.index] in u' \t':222 self.index += 1223 if self.data[self.index] == u'#':224 while self.data[self.index] != u'\n':225 self.index += 1226 if self.data[self.index] == u'\n':227 self.index += 1228 else:229 found = True230 231 class CanonicalParser:232 233 def __init__(self):234 self.events = []235 self.parse()236 237 # stream: STREAM-START document* STREAM-END238 def parse_stream(self):239 self.get_token(StreamStartToken)240 self.events.append(StreamStartEvent(None, None))241 while not self.check_token(StreamEndToken):242 if self.check_token(DirectiveToken, DocumentStartToken):243 self.parse_document()244 else:245 raise Error("document is expected, got "+repr(self.tokens[self.index]))246 self.get_token(StreamEndToken)247 self.events.append(StreamEndEvent(None, None))248 249 # document: DIRECTIVE? DOCUMENT-START node250 def parse_document(self):251 node = None252 if self.check_token(DirectiveToken):253 self.get_token(DirectiveToken)254 self.get_token(DocumentStartToken)255 self.events.append(DocumentStartEvent(None, None))256 self.parse_node()257 self.events.append(DocumentEndEvent(None, None))258 259 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)260 def parse_node(self):261 if self.check_token(AliasToken):262 self.events.append(AliasEvent(self.get_token_value(), None, None))263 else:264 anchor = None265 if self.check_token(AnchorToken):266 anchor = self.get_token_value()267 tag = None268 if self.check_token(TagToken):269 tag = self.get_token_value()270 if self.check_token(ScalarToken):271 self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))272 elif self.check_token(FlowSequenceStartToken):273 self.events.append(SequenceStartEvent(anchor, tag, None, None))274 self.parse_sequence()275 elif self.check_token(FlowMappingStartToken):276 self.events.append(MappingStartEvent(anchor, tag, None, None))277 self.parse_mapping()278 else:279 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))280 281 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END282 def parse_sequence(self):283 self.get_token(FlowSequenceStartToken)284 if not self.check_token(FlowSequenceEndToken):285 self.parse_node()286 while not self.check_token(FlowSequenceEndToken):287 self.get_token(FlowEntryToken)288 if not self.check_token(FlowSequenceEndToken):289 self.parse_node()290 self.get_token(FlowSequenceEndToken)291 self.events.append(SequenceEndEvent(None, None))292 293 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END294 def parse_mapping(self):295 self.get_token(FlowMappingStartToken)296 if not self.check_token(FlowMappingEndToken):297 self.parse_map_entry()298 while not self.check_token(FlowMappingEndToken):299 self.get_token(FlowEntryToken)300 if not self.check_token(FlowMappingEndToken):301 self.parse_map_entry()302 self.get_token(FlowMappingEndToken)303 self.events.append(MappingEndEvent(None, None))304 305 # map_entry: KEY node VALUE node306 def parse_map_entry(self):307 self.get_token(KeyToken)308 self.parse_node()309 self.get_token(ValueToken)310 self.parse_node()311 312 def parse(self):313 self.parse_stream()314 315 def get_event(self):316 return self.events.pop(0)317 318 def check_event(self, *choices):319 if self.events:320 if not choices:321 return True322 for choice in choices:323 if isinstance(self.events[0], choice):324 return True325 return False326 327 def peek_event(self):328 return self.events[0]329 330 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):331 332 def __init__(self, stream):333 if hasattr(stream, 'read'):334 stream = stream.read()335 CanonicalScanner.__init__(self, stream)336 CanonicalParser.__init__(self)337 Composer.__init__(self)338 Constructor.__init__(self)339 Resolver.__init__(self)340 341 def canonical_scan(stream):342 return scan(stream, Loader=CanonicalLoader)343 344 def canonical_parse(stream):345 return parse(stream, Loader=CanonicalLoader)346 347 def canonical_compose(stream):348 return compose(stream, Loader=CanonicalLoader)349 350 def canonical_compose_all(stream):351 return compose_all(stream, Loader=CanonicalLoader)352 353 def canonical_load(stream):354 return load(stream, Loader=CanonicalLoader)355 356 def canonical_load_all(stream):357 return load_all(stream, Loader=CanonicalLoader)358
Note: See TracChangeset
for help on using the changeset viewer.
