Ticket #51: test_socket.py

File test_socket.py, 999 bytes (added by edward@…, 8 years ago)

Program demonstrates problem

Line 
1import yaml
2import socket
3from optparse import OptionParser
4import time
5
6class SockStream:
7    def __init__(self, sock):
8        self.sock = sock
9    def read(self, bytes):
10        return self.sock.recv(bytes)
11    def write(self, string):
12        return self.sock.send(string)
13    def __getattr__(self, name):
14        return getattr(self.sock, name)
15
16
17op = OptionParser()
18op.add_option("--server", action="store_true", default=False)
19op.add_option("--port", type="int", default=8000)
20opts, args = op.parse_args()
21
22if opts.server:
23    serv = socket.socket()
24    serv.bind(('127.0.0.1', opts.port))
25    serv.listen(1)
26    s,addr = serv.accept()
27    s = SockStream(s)
28    for item in yaml.load_all(s):
29        print item
30else:
31    s = socket.socket()
32    s.connect(('127.0.0.1',opts.port))
33    s = SockStream(s)
34    def generator():
35        for i in range(100):
36            time.sleep(1)
37            print i
38            yield i
39    yaml.dump_all(generator(), s, explicit_start=True, explicit_end=True)