Ignore:
Timestamp:
12/28/08 15:16:50 (6 years ago)
Author:
xi
Message:

Refactored the test suite; updated include and library paths in setup.cfg.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pyyaml/trunk/tests/test_appliance.py

    r312 r322  
    11 
    2 import unittest, os 
     2import sys, os, os.path, types, traceback, pprint 
    33 
    4 from yaml import * 
    5 from yaml.composer import * 
    6 from yaml.constructor import * 
    7 from yaml.resolver import * 
     4DATA = 'tests/data' 
    85 
    9 class TestAppliance(unittest.TestCase): 
     6def find_test_functions(collections): 
     7    if not isinstance(collections, list): 
     8        collections = [collections] 
     9    functions = [] 
     10    for collection in collections: 
     11        if not isinstance(collection, dict): 
     12            collection = vars(collection) 
     13        keys = collection.keys() 
     14        keys.sort() 
     15        for key in keys: 
     16            value = collection[key] 
     17            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 
     18                functions.append(value) 
     19    return functions 
    1020 
    11     DATA = 'tests/data' 
    12     SKIP_EXT = '.skip' 
     21def find_test_filenames(directory): 
     22    filenames = {} 
     23    for filename in os.listdir(directory): 
     24        if os.path.isfile(os.path.join(directory, filename)): 
     25            base, ext = os.path.splitext(filename) 
     26            filenames.setdefault(base, []).append(ext) 
     27    filenames = filenames.items() 
     28    filenames.sort() 
     29    return filenames 
    1330 
    14     all_tests = {} 
    15     for filename in os.listdir(DATA): 
    16         if os.path.isfile(os.path.join(DATA, filename)): 
    17             root, ext = os.path.splitext(filename) 
    18             all_tests.setdefault(root, []).append(ext) 
     31def parse_arguments(args): 
     32    if args is None: 
     33        args = sys.argv[1:] 
     34    verbose = False 
     35    if '-v' in args: 
     36        verbose = True 
     37        args.remove('-v') 
     38    if '--verbose' in args: 
     39        verbose = True 
     40    if 'YAML_TEST_VERBOSE' in os.environ: 
     41        verbose = True 
     42    include_functions = [] 
     43    if args: 
     44        include_functions.append(args.pop(0)) 
     45    if 'YAML_TEST_FUNCTIONS' in os.environ: 
     46        include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) 
     47    include_filenames = [] 
     48    include_filenames.extend(args) 
     49    if 'YAML_TEST_FILENAMES' in os.environ: 
     50        include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) 
     51    return include_functions, include_filenames, verbose 
    1952 
    20     def add_tests(cls, method_name, *extensions): 
    21         for test in cls.all_tests: 
    22             available_extensions = cls.all_tests[test] 
    23             if cls.SKIP_EXT in available_extensions: 
    24                 continue 
    25             for ext in extensions: 
    26                 if ext not in available_extensions: 
    27                     break 
    28             else: 
    29                 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions] 
    30                 def test_method(self, test=test, filenames=filenames): 
    31                     getattr(self, '_'+method_name)(test, *filenames) 
    32                 test = test.replace('-', '_').replace('.', '_') 
    33                 try: 
    34                     test_method.__name__ = '%s_%s' % (method_name, test) 
    35                 except TypeError: 
    36                     import new 
    37                     test_method = new.function(test_method.func_code, test_method.func_globals, 
    38                             '%s_%s' % (method_name, test), test_method.func_defaults, 
    39                             test_method.func_closure) 
    40                 setattr(cls, test_method.__name__, test_method) 
    41     add_tests = classmethod(add_tests) 
     53def execute(function, filenames, verbose): 
     54    if verbose: 
     55        sys.stdout.write('='*75+'\n') 
     56        sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames))) 
     57    try: 
     58        function(verbose=verbose, *filenames) 
     59    except Exception, exc: 
     60        info = sys.exc_info() 
     61        if isinstance(exc, AssertionError): 
     62            kind = 'FAILURE' 
     63        else: 
     64            kind = 'ERROR' 
     65        if verbose: 
     66            traceback.print_exc(limit=1, file=sys.stdout) 
     67        else: 
     68            sys.stdout.write(kind[0]) 
     69            sys.stdout.flush() 
     70    else: 
     71        kind = 'SUCCESS' 
     72        info = None 
     73        if not verbose: 
     74            sys.stdout.write('.') 
     75    sys.stdout.flush() 
     76    return (function, filenames, kind, info) 
    4277 
    43 class Error(Exception): 
    44     pass 
     78def display(results, verbose): 
     79    if results and not verbose: 
     80        sys.stdout.write('\n') 
     81    total = len(results) 
     82    failures = 0 
     83    errors = 0 
     84    for function, filenames, kind, info in results: 
     85        if kind == 'SUCCESS': 
     86            continue 
     87        if kind == 'FAILURE': 
     88            failures += 1 
     89        if kind == 'ERROR': 
     90            errors += 1 
     91        sys.stdout.write('='*75+'\n') 
     92        sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind)) 
     93        if kind == 'ERROR': 
     94            traceback.print_exception(file=sys.stdout, *info) 
     95        else: 
     96            sys.stdout.write('Traceback (most recent call last):\n') 
     97            traceback.print_tb(info[2], file=sys.stdout) 
     98            sys.stdout.write('%s: see below\n' % info[0].__name__) 
     99            sys.stdout.write('~'*75+'\n') 
     100            for arg in info[1].args: 
     101                pprint.pprint(arg, stream=sys.stdout, indent=2) 
     102        for filename in filenames: 
     103            sys.stdout.write('-'*75+'\n') 
     104            sys.stdout.write('%s:\n' % filename) 
     105            data = open(filename, 'rb').read() 
     106            sys.stdout.write(data) 
     107            if data and data[-1] != '\n': 
     108                sys.stdout.write('\n') 
     109    sys.stdout.write('='*75+'\n') 
     110    sys.stdout.write('TESTS: %s\n' % total) 
     111    if failures: 
     112        sys.stdout.write('FAILURES: %s\n' % failures) 
     113    if errors: 
     114        sys.stdout.write('ERRORS: %s\n' % errors) 
    45115 
    46 class CanonicalScanner: 
     116def run(collections, args=None): 
     117    test_functions = find_test_functions(collections) 
     118    test_filenames = find_test_filenames(DATA) 
     119    include_functions, include_filenames, verbose = parse_arguments(args) 
     120    results = [] 
     121    for function in test_functions: 
     122        if include_functions and function.func_name not in include_functions: 
     123            continue 
     124        if function.unittest: 
     125            for base, exts in test_filenames: 
     126                if include_filenames and base not in include_filenames: 
     127                    continue 
     128                filenames = [] 
     129                for ext in function.unittest: 
     130                    if ext not in exts: 
     131                        break 
     132                    filenames.append(os.path.join(DATA, base+ext)) 
     133                else: 
     134                    skip_exts = getattr(function, 'skip', []) 
     135                    for skip_ext in skip_exts: 
     136                        if skip_ext in exts: 
     137                            break 
     138                    else: 
     139                        result = execute(function, filenames, verbose) 
     140                        results.append(result) 
     141        else: 
     142            result = execute(function, [], verbose) 
     143            results.append(result) 
     144    display(results, verbose=verbose) 
    47145 
    48     def __init__(self, data): 
    49         self.data = unicode(data, 'utf-8')+u'\0' 
    50         self.index = 0 
    51         self.scan() 
    52  
    53     def check_token(self, *choices): 
    54         if self.tokens: 
    55             if not choices: 
    56                 return True 
    57             for choice in choices: 
    58                 if isinstance(self.tokens[0], choice): 
    59                     return True 
    60         return False 
    61  
    62     def peek_token(self): 
    63         if self.tokens: 
    64             return self.tokens[0] 
    65  
    66     def get_token(self, choice=None): 
    67         token = self.tokens.pop(0) 
    68         if choice and not isinstance(token, choice): 
    69             raise Error("unexpected token "+repr(token)) 
    70         return token 
    71  
    72     def get_token_value(self): 
    73         token = self.get_token() 
    74         return token.value 
    75  
    76     def scan(self): 
    77         self.tokens = [] 
    78         self.tokens.append(StreamStartToken(None, None)) 
    79         while True: 
    80             self.find_token() 
    81             ch = self.data[self.index] 
    82             if ch == u'\0': 
    83                 self.tokens.append(StreamEndToken(None, None)) 
    84                 break 
    85             elif ch == u'%': 
    86                 self.tokens.append(self.scan_directive()) 
    87             elif ch == u'-' and self.data[self.index:self.index+3] == u'---': 
    88                 self.index += 3 
    89                 self.tokens.append(DocumentStartToken(None, None)) 
    90             elif ch == u'[': 
    91                 self.index += 1 
    92                 self.tokens.append(FlowSequenceStartToken(None, None)) 
    93             elif ch == u'{': 
    94                 self.index += 1 
    95                 self.tokens.append(FlowMappingStartToken(None, None)) 
    96             elif ch == u']': 
    97                 self.index += 1 
    98                 self.tokens.append(FlowSequenceEndToken(None, None)) 
    99             elif ch == u'}': 
    100                 self.index += 1 
    101                 self.tokens.append(FlowMappingEndToken(None, None)) 
    102             elif ch == u'?': 
    103                 self.index += 1 
    104                 self.tokens.append(KeyToken(None, None)) 
    105             elif ch == u':': 
    106                 self.index += 1 
    107                 self.tokens.append(ValueToken(None, None)) 
    108             elif ch == u',': 
    109                 self.index += 1 
    110                 self.tokens.append(FlowEntryToken(None, None)) 
    111             elif ch == u'*' or ch == u'&': 
    112                 self.tokens.append(self.scan_alias()) 
    113             elif ch == u'!': 
    114                 self.tokens.append(self.scan_tag()) 
    115             elif ch == u'"': 
    116                 self.tokens.append(self.scan_scalar()) 
    117             else: 
    118                 raise Error("invalid token") 
    119  
    120     DIRECTIVE = u'%YAML 1.1' 
    121  
    122     def scan_directive(self): 
    123         if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \ 
    124                 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0': 
    125             self.index += len(self.DIRECTIVE) 
    126             return DirectiveToken('YAML', (1, 1), None, None) 
    127  
    128     def scan_alias(self): 
    129         if self.data[self.index] == u'*': 
    130             TokenClass = AliasToken 
    131         else: 
    132             TokenClass = AnchorToken 
    133         self.index += 1 
    134         start = self.index 
    135         while self.data[self.index] not in u', \n\0': 
    136             self.index += 1 
    137         value = self.data[start:self.index] 
    138         return TokenClass(value, None, None) 
    139  
    140     def scan_tag(self): 
    141         self.index += 1 
    142         start = self.index 
    143         while self.data[self.index] not in u' \n\0': 
    144             self.index += 1 
    145         value = self.data[start:self.index] 
    146         if value[0] == u'!': 
    147             value = 'tag:yaml.org,2002:'+value[1:] 
    148         elif value[0] == u'<' and value[-1] == u'>': 
    149             value = value[1:-1] 
    150         else: 
    151             value = u'!'+value 
    152         return TagToken(value, None, None) 
    153  
    154     QUOTE_CODES = { 
    155         'x': 2, 
    156         'u': 4, 
    157         'U': 8, 
    158     } 
    159  
    160     QUOTE_REPLACES = { 
    161         u'\\': u'\\', 
    162         u'\"': u'\"', 
    163         u' ': u' ', 
    164         u'a': u'\x07', 
    165         u'b': u'\x08', 
    166         u'e': u'\x1B', 
    167         u'f': u'\x0C', 
    168         u'n': u'\x0A', 
    169         u'r': u'\x0D', 
    170         u't': u'\x09', 
    171         u'v': u'\x0B', 
    172         u'N': u'\u0085', 
    173         u'L': u'\u2028', 
    174         u'P': u'\u2029', 
    175         u'_': u'_', 
    176         u'0': u'\x00', 
    177  
    178     } 
    179  
    180     def scan_scalar(self): 
    181         self.index += 1 
    182         chunks = [] 
    183         start = self.index 
    184         ignore_spaces = False 
    185         while self.data[self.index] != u'"': 
    186             if self.data[self.index] == u'\\': 
    187                 ignore_spaces = False 
    188                 chunks.append(self.data[start:self.index]) 
    189                 self.index += 1 
    190                 ch = self.data[self.index] 
    191                 self.index += 1 
    192                 if ch == u'\n': 
    193                     ignore_spaces = True 
    194                 elif ch in self.QUOTE_CODES: 
    195                     length = self.QUOTE_CODES[ch] 
    196                     code = int(self.data[self.index:self.index+length], 16) 
    197                     chunks.append(unichr(code)) 
    198                     self.index += length 
    199                 else: 
    200                     chunks.append(self.QUOTE_REPLACES[ch]) 
    201                 start = self.index 
    202             elif self.data[self.index] == u'\n': 
    203                 chunks.append(self.data[start:self.index]) 
    204                 chunks.append(u' ') 
    205                 self.index += 1 
    206                 start = self.index 
    207                 ignore_spaces = True 
    208             elif ignore_spaces and self.data[self.index] == u' ': 
    209                 self.index += 1 
    210                 start = self.index 
    211             else: 
    212                 ignore_spaces = False 
    213                 self.index += 1 
    214         chunks.append(self.data[start:self.index]) 
    215         self.index += 1 
    216         return ScalarToken(u''.join(chunks), False, None, None) 
    217  
    218     def find_token(self): 
    219         found = False 
    220         while not found: 
    221             while self.data[self.index] in u' \t': 
    222                 self.index += 1 
    223             if self.data[self.index] == u'#': 
    224                 while self.data[self.index] != u'\n': 
    225                     self.index += 1 
    226             if self.data[self.index] == u'\n': 
    227                 self.index += 1 
    228             else: 
    229                 found = True 
    230  
    231 class CanonicalParser: 
    232  
    233     def __init__(self): 
    234         self.events = [] 
    235         self.parse() 
    236  
    237     # stream: STREAM-START document* STREAM-END 
    238     def parse_stream(self): 
    239         self.get_token(StreamStartToken) 
    240         self.events.append(StreamStartEvent(None, None)) 
    241         while not self.check_token(StreamEndToken): 
    242             if self.check_token(DirectiveToken, DocumentStartToken): 
    243                 self.parse_document() 
    244             else: 
    245                 raise Error("document is expected, got "+repr(self.tokens[self.index])) 
    246         self.get_token(StreamEndToken) 
    247         self.events.append(StreamEndEvent(None, None)) 
    248  
    249     # document: DIRECTIVE? DOCUMENT-START node 
    250     def parse_document(self): 
    251         node = None 
    252         if self.check_token(DirectiveToken): 
    253             self.get_token(DirectiveToken) 
    254         self.get_token(DocumentStartToken) 
    255         self.events.append(DocumentStartEvent(None, None)) 
    256         self.parse_node() 
    257         self.events.append(DocumentEndEvent(None, None)) 
    258  
    259     # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) 
    260     def parse_node(self): 
    261         if self.check_token(AliasToken): 
    262             self.events.append(AliasEvent(self.get_token_value(), None, None)) 
    263         else: 
    264             anchor = None 
    265             if self.check_token(AnchorToken): 
    266                 anchor = self.get_token_value() 
    267             tag = None 
    268             if self.check_token(TagToken): 
    269                 tag = self.get_token_value() 
    270             if self.check_token(ScalarToken): 
    271                 self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None)) 
    272             elif self.check_token(FlowSequenceStartToken): 
    273                 self.events.append(SequenceStartEvent(anchor, tag, None, None)) 
    274                 self.parse_sequence() 
    275             elif self.check_token(FlowMappingStartToken): 
    276                 self.events.append(MappingStartEvent(anchor, tag, None, None)) 
    277                 self.parse_mapping() 
    278             else: 
    279                 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index])) 
    280  
    281     # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END 
    282     def parse_sequence(self): 
    283         self.get_token(FlowSequenceStartToken) 
    284         if not self.check_token(FlowSequenceEndToken): 
    285             self.parse_node() 
    286             while not self.check_token(FlowSequenceEndToken): 
    287                 self.get_token(FlowEntryToken) 
    288                 if not self.check_token(FlowSequenceEndToken): 
    289                     self.parse_node() 
    290         self.get_token(FlowSequenceEndToken) 
    291         self.events.append(SequenceEndEvent(None, None)) 
    292  
    293     # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END 
    294     def parse_mapping(self): 
    295         self.get_token(FlowMappingStartToken) 
    296         if not self.check_token(FlowMappingEndToken): 
    297             self.parse_map_entry() 
    298             while not self.check_token(FlowMappingEndToken): 
    299                 self.get_token(FlowEntryToken) 
    300                 if not self.check_token(FlowMappingEndToken): 
    301                     self.parse_map_entry() 
    302         self.get_token(FlowMappingEndToken) 
    303         self.events.append(MappingEndEvent(None, None)) 
    304  
    305     # map_entry: KEY node VALUE node 
    306     def parse_map_entry(self): 
    307         self.get_token(KeyToken) 
    308         self.parse_node() 
    309         self.get_token(ValueToken) 
    310         self.parse_node() 
    311  
    312     def parse(self): 
    313         self.parse_stream() 
    314  
    315     def get_event(self): 
    316         return self.events.pop(0) 
    317  
    318     def check_event(self, *choices): 
    319         if self.events: 
    320             if not choices: 
    321                 return True 
    322             for choice in choices: 
    323                 if isinstance(self.events[0], choice): 
    324                     return True 
    325         return False 
    326  
    327     def peek_event(self): 
    328         return self.events[0] 
    329  
    330 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver): 
    331  
    332     def __init__(self, stream): 
    333         if hasattr(stream, 'read'): 
    334             stream = stream.read() 
    335         CanonicalScanner.__init__(self, stream) 
    336         CanonicalParser.__init__(self) 
    337         Composer.__init__(self) 
    338         Constructor.__init__(self) 
    339         Resolver.__init__(self) 
    340  
    341 def canonical_scan(stream): 
    342     return scan(stream, Loader=CanonicalLoader) 
    343  
    344 def canonical_parse(stream): 
    345     return parse(stream, Loader=CanonicalLoader) 
    346  
    347 def canonical_compose(stream): 
    348     return compose(stream, Loader=CanonicalLoader) 
    349  
    350 def canonical_compose_all(stream): 
    351     return compose_all(stream, Loader=CanonicalLoader) 
    352  
    353 def canonical_load(stream): 
    354     return load(stream, Loader=CanonicalLoader) 
    355  
    356 def canonical_load_all(stream): 
    357     return load_all(stream, Loader=CanonicalLoader) 
    358  
Note: See TracChangeset for help on using the changeset viewer.