root/pyyaml-legacy/tags/PyYaml_0.32/query.py

Revision 75, 1.8 kB (checked in by tim, 5 years ago)

tagged the pre fork version

  • Property svn:executable set to *
Line 
1 import yaml
2 from test import assertEquals
3 import re
4
5 def Query(query, doc):
6     results = []
7     for row in yaml.ypath(query['from'],doc):
8         data = {}
9         if Where(row, query):
10             for field in query['select']:
11                 data[field] = yaml.ypath(field,row).next()
12             if query.has_key('order'):
13                 data['__row__'] = row
14             results.append(data)
15     return Order(results, query)
16
17 def Where(row, query):
18     if query.has_key('where'):
19         where = query['where']
20         return WhereRow(row, query['where'])
21     return 1
22
23 def WhereRow(row, where):
24     if type(where) == type([]):
25         return And(row, where, WhereRow)
26     if type(where) == type(''):
27         return WhereString(row, where)
28     return And(row, where.keys(),
29         lambda row, field: where[field] == row[field])
30
31 def And(row, conds, condFunc):
32     for cond in conds:
33         if not condFunc(row, cond):
34             return 0
35     return 1
36
37 def WhereString(row, where):
38     where = re.sub("\[(.*?)\]", r"row['\1']", where)
39     return eval(where)
40
41 def Order(results, query):
42     if not query.has_key('order'):
43         return results
44     compare = lambda x,y: Compare(x,y,query['order'])
45     results.sort(compare)
46     for result in results:
47         del result['__row__']
48     return results
49
50 def Compare(x, y, order):
51     for field in order:
52         val = cmp(
53             yaml.ypath(field,x['__row__']).next(),
54             yaml.ypath(field,y['__row__']).next()
55         )
56         if val != 0: return val
57
58 def TestQuery(query, expected, data):
59     assertEquals(Query(query, data), expected, query)   
60
61 doc = yaml.loadFile("query.yml").next()
62 for test in doc['tests']:
63     if not test.has_key('ignore'):
64         TestQuery(test['query'], test['expected'], doc['data'])
65
66
Note: See TracBrowser for help on using the browser.