Index: pyyaml/trunk/tests/test_appliance.py
===================================================================
--- pyyaml/trunk/tests/test_appliance.py	(revision 312)
+++ pyyaml/trunk/tests/test_appliance.py	(revision 322)
@@ -1,358 +1,145 @@
 
-import unittest, os
+import sys, os, os.path, types, traceback, pprint
 
-from yaml import *
-from yaml.composer import *
-from yaml.constructor import *
-from yaml.resolver import *
+DATA = 'tests/data'
 
-class TestAppliance(unittest.TestCase):
+def find_test_functions(collections):
+    if not isinstance(collections, list):
+        collections = [collections]
+    functions = []
+    for collection in collections:
+        if not isinstance(collection, dict):
+            collection = vars(collection)
+        keys = collection.keys()
+        keys.sort()
+        for key in keys:
+            value = collection[key]
+            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+                functions.append(value)
+    return functions
 
-    DATA = 'tests/data'
-    SKIP_EXT = '.skip'
+def find_test_filenames(directory):
+    filenames = {}
+    for filename in os.listdir(directory):
+        if os.path.isfile(os.path.join(directory, filename)):
+            base, ext = os.path.splitext(filename)
+            filenames.setdefault(base, []).append(ext)
+    filenames = filenames.items()
+    filenames.sort()
+    return filenames
 
-    all_tests = {}
-    for filename in os.listdir(DATA):
-        if os.path.isfile(os.path.join(DATA, filename)):
-            root, ext = os.path.splitext(filename)
-            all_tests.setdefault(root, []).append(ext)
+def parse_arguments(args):
+    if args is None:
+        args = sys.argv[1:]
+    verbose = False
+    if '-v' in args:
+        verbose = True
+        args.remove('-v')
+    if '--verbose' in args:
+        verbose = True
+    if 'YAML_TEST_VERBOSE' in os.environ:
+        verbose = True
+    include_functions = []
+    if args:
+        include_functions.append(args.pop(0))
+    if 'YAML_TEST_FUNCTIONS' in os.environ:
+        include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
+    include_filenames = []
+    include_filenames.extend(args)
+    if 'YAML_TEST_FILENAMES' in os.environ:
+        include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
+    return include_functions, include_filenames, verbose
 
-    def add_tests(cls, method_name, *extensions):
-        for test in cls.all_tests:
-            available_extensions = cls.all_tests[test]
-            if cls.SKIP_EXT in available_extensions:
-                continue
-            for ext in extensions:
-                if ext not in available_extensions:
-                    break
-            else:
-                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
-                def test_method(self, test=test, filenames=filenames):
-                    getattr(self, '_'+method_name)(test, *filenames)
-                test = test.replace('-', '_').replace('.', '_')
-                try:
-                    test_method.__name__ = '%s_%s' % (method_name, test)
-                except TypeError:
-                    import new
-                    test_method = new.function(test_method.func_code, test_method.func_globals,
-                            '%s_%s' % (method_name, test), test_method.func_defaults,
-                            test_method.func_closure)
-                setattr(cls, test_method.__name__, test_method)
-    add_tests = classmethod(add_tests)
+def execute(function, filenames, verbose):
+    if verbose:
+        sys.stdout.write('='*75+'\n')
+        sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames)))
+    try:
+        function(verbose=verbose, *filenames)
+    except Exception, exc:
+        info = sys.exc_info()
+        if isinstance(exc, AssertionError):
+            kind = 'FAILURE'
+        else:
+            kind = 'ERROR'
+        if verbose:
+            traceback.print_exc(limit=1, file=sys.stdout)
+        else:
+            sys.stdout.write(kind[0])
+            sys.stdout.flush()
+    else:
+        kind = 'SUCCESS'
+        info = None
+        if not verbose:
+            sys.stdout.write('.')
+    sys.stdout.flush()
+    return (function, filenames, kind, info)
 
-class Error(Exception):
-    pass
+def display(results, verbose):
+    if results and not verbose:
+        sys.stdout.write('\n')
+    total = len(results)
+    failures = 0
+    errors = 0
+    for function, filenames, kind, info in results:
+        if kind == 'SUCCESS':
+            continue
+        if kind == 'FAILURE':
+            failures += 1
+        if kind == 'ERROR':
+            errors += 1
+        sys.stdout.write('='*75+'\n')
+        sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind))
+        if kind == 'ERROR':
+            traceback.print_exception(file=sys.stdout, *info)
+        else:
+            sys.stdout.write('Traceback (most recent call last):\n')
+            traceback.print_tb(info[2], file=sys.stdout)
+            sys.stdout.write('%s: see below\n' % info[0].__name__)
+            sys.stdout.write('~'*75+'\n')
+            for arg in info[1].args:
+                pprint.pprint(arg, stream=sys.stdout, indent=2)
+        for filename in filenames:
+            sys.stdout.write('-'*75+'\n')
+            sys.stdout.write('%s:\n' % filename)
+            data = open(filename, 'rb').read()
+            sys.stdout.write(data)
+            if data and data[-1] != '\n':
+                sys.stdout.write('\n')
+    sys.stdout.write('='*75+'\n')
+    sys.stdout.write('TESTS: %s\n' % total)
+    if failures:
+        sys.stdout.write('FAILURES: %s\n' % failures)
+    if errors:
+        sys.stdout.write('ERRORS: %s\n' % errors)
 
-class CanonicalScanner:
+def run(collections, args=None):
+    test_functions = find_test_functions(collections)
+    test_filenames = find_test_filenames(DATA)
+    include_functions, include_filenames, verbose = parse_arguments(args)
+    results = []
+    for function in test_functions:
+        if include_functions and function.func_name not in include_functions:
+            continue
+        if function.unittest:
+            for base, exts in test_filenames:
+                if include_filenames and base not in include_filenames:
+                    continue
+                filenames = []
+                for ext in function.unittest:
+                    if ext not in exts:
+                        break
+                    filenames.append(os.path.join(DATA, base+ext))
+                else:
+                    skip_exts = getattr(function, 'skip', [])
+                    for skip_ext in skip_exts:
+                        if skip_ext in exts:
+                            break
+                    else:
+                        result = execute(function, filenames, verbose)
+                        results.append(result)
+        else:
+            result = execute(function, [], verbose)
+            results.append(result)
+    display(results, verbose=verbose)
 
-    def __init__(self, data):
-        self.data = unicode(data, 'utf-8')+u'\0'
-        self.index = 0
-        self.scan()
-
-    def check_token(self, *choices):
-        if self.tokens:
-            if not choices:
-                return True
-            for choice in choices:
-                if isinstance(self.tokens[0], choice):
-                    return True
-        return False
-
-    def peek_token(self):
-        if self.tokens:
-            return self.tokens[0]
-
-    def get_token(self, choice=None):
-        token = self.tokens.pop(0)
-        if choice and not isinstance(token, choice):
-            raise Error("unexpected token "+repr(token))
-        return token
-
-    def get_token_value(self):
-        token = self.get_token()
-        return token.value
-
-    def scan(self):
-        self.tokens = []
-        self.tokens.append(StreamStartToken(None, None))
-        while True:
-            self.find_token()
-            ch = self.data[self.index]
-            if ch == u'\0':
-                self.tokens.append(StreamEndToken(None, None))
-                break
-            elif ch == u'%':
-                self.tokens.append(self.scan_directive())
-            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
-                self.index += 3
-                self.tokens.append(DocumentStartToken(None, None))
-            elif ch == u'[':
-                self.index += 1
-                self.tokens.append(FlowSequenceStartToken(None, None))
-            elif ch == u'{':
-                self.index += 1
-                self.tokens.append(FlowMappingStartToken(None, None))
-            elif ch == u']':
-                self.index += 1
-                self.tokens.append(FlowSequenceEndToken(None, None))
-            elif ch == u'}':
-                self.index += 1
-                self.tokens.append(FlowMappingEndToken(None, None))
-            elif ch == u'?':
-                self.index += 1
-                self.tokens.append(KeyToken(None, None))
-            elif ch == u':':
-                self.index += 1
-                self.tokens.append(ValueToken(None, None))
-            elif ch == u',':
-                self.index += 1
-                self.tokens.append(FlowEntryToken(None, None))
-            elif ch == u'*' or ch == u'&':
-                self.tokens.append(self.scan_alias())
-            elif ch == u'!':
-                self.tokens.append(self.scan_tag())
-            elif ch == u'"':
-                self.tokens.append(self.scan_scalar())
-            else:
-                raise Error("invalid token")
-
-    DIRECTIVE = u'%YAML 1.1'
-
-    def scan_directive(self):
-        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
-                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
-            self.index += len(self.DIRECTIVE)
-            return DirectiveToken('YAML', (1, 1), None, None)
-
-    def scan_alias(self):
-        if self.data[self.index] == u'*':
-            TokenClass = AliasToken
-        else:
-            TokenClass = AnchorToken
-        self.index += 1
-        start = self.index
-        while self.data[self.index] not in u', \n\0':
-            self.index += 1
-        value = self.data[start:self.index]
-        return TokenClass(value, None, None)
-
-    def scan_tag(self):
-        self.index += 1
-        start = self.index
-        while self.data[self.index] not in u' \n\0':
-            self.index += 1
-        value = self.data[start:self.index]
-        if value[0] == u'!':
-            value = 'tag:yaml.org,2002:'+value[1:]
-        elif value[0] == u'<' and value[-1] == u'>':
-            value = value[1:-1]
-        else:
-            value = u'!'+value
-        return TagToken(value, None, None)
-
-    QUOTE_CODES = {
-        'x': 2,
-        'u': 4,
-        'U': 8,
-    }
-
-    QUOTE_REPLACES = {
-        u'\\': u'\\',
-        u'\"': u'\"',
-        u' ': u' ',
-        u'a': u'\x07',
-        u'b': u'\x08',
-        u'e': u'\x1B',
-        u'f': u'\x0C',
-        u'n': u'\x0A',
-        u'r': u'\x0D',
-        u't': u'\x09',
-        u'v': u'\x0B',
-        u'N': u'\u0085',
-        u'L': u'\u2028',
-        u'P': u'\u2029',
-        u'_': u'_',
-        u'0': u'\x00',
-
-    }
-
-    def scan_scalar(self):
-        self.index += 1
-        chunks = []
-        start = self.index
-        ignore_spaces = False
-        while self.data[self.index] != u'"':
-            if self.data[self.index] == u'\\':
-                ignore_spaces = False
-                chunks.append(self.data[start:self.index])
-                self.index += 1
-                ch = self.data[self.index]
-                self.index += 1
-                if ch == u'\n':
-                    ignore_spaces = True
-                elif ch in self.QUOTE_CODES:
-                    length = self.QUOTE_CODES[ch]
-                    code = int(self.data[self.index:self.index+length], 16)
-                    chunks.append(unichr(code))
-                    self.index += length
-                else:
-                    chunks.append(self.QUOTE_REPLACES[ch])
-                start = self.index
-            elif self.data[self.index] == u'\n':
-                chunks.append(self.data[start:self.index])
-                chunks.append(u' ')
-                self.index += 1
-                start = self.index
-                ignore_spaces = True
-            elif ignore_spaces and self.data[self.index] == u' ':
-                self.index += 1
-                start = self.index
-            else:
-                ignore_spaces = False
-                self.index += 1
-        chunks.append(self.data[start:self.index])
-        self.index += 1
-        return ScalarToken(u''.join(chunks), False, None, None)
-
-    def find_token(self):
-        found = False
-        while not found:
-            while self.data[self.index] in u' \t':
-                self.index += 1
-            if self.data[self.index] == u'#':
-                while self.data[self.index] != u'\n':
-                    self.index += 1
-            if self.data[self.index] == u'\n':
-                self.index += 1
-            else:
-                found = True
-
-class CanonicalParser:
-
-    def __init__(self):
-        self.events = []
-        self.parse()
-
-    # stream: STREAM-START document* STREAM-END
-    def parse_stream(self):
-        self.get_token(StreamStartToken)
-        self.events.append(StreamStartEvent(None, None))
-        while not self.check_token(StreamEndToken):
-            if self.check_token(DirectiveToken, DocumentStartToken):
-                self.parse_document()
-            else:
-                raise Error("document is expected, got "+repr(self.tokens[self.index]))
-        self.get_token(StreamEndToken)
-        self.events.append(StreamEndEvent(None, None))
-
-    # document: DIRECTIVE? DOCUMENT-START node
-    def parse_document(self):
-        node = None
-        if self.check_token(DirectiveToken):
-            self.get_token(DirectiveToken)
-        self.get_token(DocumentStartToken)
-        self.events.append(DocumentStartEvent(None, None))
-        self.parse_node()
-        self.events.append(DocumentEndEvent(None, None))
-
-    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
-    def parse_node(self):
-        if self.check_token(AliasToken):
-            self.events.append(AliasEvent(self.get_token_value(), None, None))
-        else:
-            anchor = None
-            if self.check_token(AnchorToken):
-                anchor = self.get_token_value()
-            tag = None
-            if self.check_token(TagToken):
-                tag = self.get_token_value()
-            if self.check_token(ScalarToken):
-                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
-            elif self.check_token(FlowSequenceStartToken):
-                self.events.append(SequenceStartEvent(anchor, tag, None, None))
-                self.parse_sequence()
-            elif self.check_token(FlowMappingStartToken):
-                self.events.append(MappingStartEvent(anchor, tag, None, None))
-                self.parse_mapping()
-            else:
-                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
-
-    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
-    def parse_sequence(self):
-        self.get_token(FlowSequenceStartToken)
-        if not self.check_token(FlowSequenceEndToken):
-            self.parse_node()
-            while not self.check_token(FlowSequenceEndToken):
-                self.get_token(FlowEntryToken)
-                if not self.check_token(FlowSequenceEndToken):
-                    self.parse_node()
-        self.get_token(FlowSequenceEndToken)
-        self.events.append(SequenceEndEvent(None, None))
-
-    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
-    def parse_mapping(self):
-        self.get_token(FlowMappingStartToken)
-        if not self.check_token(FlowMappingEndToken):
-            self.parse_map_entry()
-            while not self.check_token(FlowMappingEndToken):
-                self.get_token(FlowEntryToken)
-                if not self.check_token(FlowMappingEndToken):
-                    self.parse_map_entry()
-        self.get_token(FlowMappingEndToken)
-        self.events.append(MappingEndEvent(None, None))
-
-    # map_entry: KEY node VALUE node
-    def parse_map_entry(self):
-        self.get_token(KeyToken)
-        self.parse_node()
-        self.get_token(ValueToken)
-        self.parse_node()
-
-    def parse(self):
-        self.parse_stream()
-
-    def get_event(self):
-        return self.events.pop(0)
-
-    def check_event(self, *choices):
-        if self.events:
-            if not choices:
-                return True
-            for choice in choices:
-                if isinstance(self.events[0], choice):
-                    return True
-        return False
-
-    def peek_event(self):
-        return self.events[0]
-
-class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
-
-    def __init__(self, stream):
-        if hasattr(stream, 'read'):
-            stream = stream.read()
-        CanonicalScanner.__init__(self, stream)
-        CanonicalParser.__init__(self)
-        Composer.__init__(self)
-        Constructor.__init__(self)
-        Resolver.__init__(self)
-
-def canonical_scan(stream):
-    return scan(stream, Loader=CanonicalLoader)
-
-def canonical_parse(stream):
-    return parse(stream, Loader=CanonicalLoader)
-
-def canonical_compose(stream):
-    return compose(stream, Loader=CanonicalLoader)
-
-def canonical_compose_all(stream):
-    return compose_all(stream, Loader=CanonicalLoader)
-
-def canonical_load(stream):
-    return load(stream, Loader=CanonicalLoader)
-
-def canonical_load_all(stream):
-    return load_all(stream, Loader=CanonicalLoader)
-
