]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_asynchat.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_asynchat.py
CommitLineData
4710c53d 1# test asynchat\r
2\r
3import asyncore, asynchat, socket, time\r
4import unittest\r
5import sys\r
6from test import test_support\r
7try:\r
8 import threading\r
9except ImportError:\r
10 threading = None\r
11\r
12HOST = test_support.HOST\r
13SERVER_QUIT = 'QUIT\n'\r
14\r
15if threading:\r
16 class echo_server(threading.Thread):\r
17 # parameter to determine the number of bytes passed back to the\r
18 # client each send\r
19 chunk_size = 1\r
20\r
21 def __init__(self, event):\r
22 threading.Thread.__init__(self)\r
23 self.event = event\r
24 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
25 self.port = test_support.bind_port(self.sock)\r
26 # This will be set if the client wants us to wait before echoing data\r
27 # back.\r
28 self.start_resend_event = None\r
29\r
30 def run(self):\r
31 self.sock.listen(1)\r
32 self.event.set()\r
33 conn, client = self.sock.accept()\r
34 self.buffer = ""\r
35 # collect data until quit message is seen\r
36 while SERVER_QUIT not in self.buffer:\r
37 data = conn.recv(1)\r
38 if not data:\r
39 break\r
40 self.buffer = self.buffer + data\r
41\r
42 # remove the SERVER_QUIT message\r
43 self.buffer = self.buffer.replace(SERVER_QUIT, '')\r
44\r
45 if self.start_resend_event:\r
46 self.start_resend_event.wait()\r
47\r
48 # re-send entire set of collected data\r
49 try:\r
50 # this may fail on some tests, such as test_close_when_done, since\r
51 # the client closes the channel when it's done sending\r
52 while self.buffer:\r
53 n = conn.send(self.buffer[:self.chunk_size])\r
54 time.sleep(0.001)\r
55 self.buffer = self.buffer[n:]\r
56 except:\r
57 pass\r
58\r
59 conn.close()\r
60 self.sock.close()\r
61\r
62 class echo_client(asynchat.async_chat):\r
63\r
64 def __init__(self, terminator, server_port):\r
65 asynchat.async_chat.__init__(self)\r
66 self.contents = []\r
67 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
68 self.connect((HOST, server_port))\r
69 self.set_terminator(terminator)\r
70 self.buffer = ''\r
71\r
72 def handle_connect(self):\r
73 pass\r
74\r
75 if sys.platform == 'darwin':\r
76 # select.poll returns a select.POLLHUP at the end of the tests\r
77 # on darwin, so just ignore it\r
78 def handle_expt(self):\r
79 pass\r
80\r
81 def collect_incoming_data(self, data):\r
82 self.buffer += data\r
83\r
84 def found_terminator(self):\r
85 self.contents.append(self.buffer)\r
86 self.buffer = ""\r
87\r
88\r
89 def start_echo_server():\r
90 event = threading.Event()\r
91 s = echo_server(event)\r
92 s.start()\r
93 event.wait()\r
94 event.clear()\r
95 time.sleep(0.01) # Give server time to start accepting.\r
96 return s, event\r
97\r
98\r
99@unittest.skipUnless(threading, 'Threading required for this test.')\r
100class TestAsynchat(unittest.TestCase):\r
101 usepoll = False\r
102\r
103 def setUp (self):\r
104 self._threads = test_support.threading_setup()\r
105\r
106 def tearDown (self):\r
107 test_support.threading_cleanup(*self._threads)\r
108\r
109 def line_terminator_check(self, term, server_chunk):\r
110 event = threading.Event()\r
111 s = echo_server(event)\r
112 s.chunk_size = server_chunk\r
113 s.start()\r
114 event.wait()\r
115 event.clear()\r
116 time.sleep(0.01) # Give server time to start accepting.\r
117 c = echo_client(term, s.port)\r
118 c.push("hello ")\r
119 c.push("world%s" % term)\r
120 c.push("I'm not dead yet!%s" % term)\r
121 c.push(SERVER_QUIT)\r
122 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
123 s.join()\r
124\r
125 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])\r
126\r
127 # the line terminator tests below check receiving variously-sized\r
128 # chunks back from the server in order to exercise all branches of\r
129 # async_chat.handle_read\r
130\r
131 def test_line_terminator1(self):\r
132 # test one-character terminator\r
133 for l in (1,2,3):\r
134 self.line_terminator_check('\n', l)\r
135\r
136 def test_line_terminator2(self):\r
137 # test two-character terminator\r
138 for l in (1,2,3):\r
139 self.line_terminator_check('\r\n', l)\r
140\r
141 def test_line_terminator3(self):\r
142 # test three-character terminator\r
143 for l in (1,2,3):\r
144 self.line_terminator_check('qqq', l)\r
145\r
146 def numeric_terminator_check(self, termlen):\r
147 # Try reading a fixed number of bytes\r
148 s, event = start_echo_server()\r
149 c = echo_client(termlen, s.port)\r
150 data = "hello world, I'm not dead yet!\n"\r
151 c.push(data)\r
152 c.push(SERVER_QUIT)\r
153 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
154 s.join()\r
155\r
156 self.assertEqual(c.contents, [data[:termlen]])\r
157\r
158 def test_numeric_terminator1(self):\r
159 # check that ints & longs both work (since type is\r
160 # explicitly checked in async_chat.handle_read)\r
161 self.numeric_terminator_check(1)\r
162 self.numeric_terminator_check(1L)\r
163\r
164 def test_numeric_terminator2(self):\r
165 self.numeric_terminator_check(6L)\r
166\r
167 def test_none_terminator(self):\r
168 # Try reading a fixed number of bytes\r
169 s, event = start_echo_server()\r
170 c = echo_client(None, s.port)\r
171 data = "hello world, I'm not dead yet!\n"\r
172 c.push(data)\r
173 c.push(SERVER_QUIT)\r
174 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
175 s.join()\r
176\r
177 self.assertEqual(c.contents, [])\r
178 self.assertEqual(c.buffer, data)\r
179\r
180 def test_simple_producer(self):\r
181 s, event = start_echo_server()\r
182 c = echo_client('\n', s.port)\r
183 data = "hello world\nI'm not dead yet!\n"\r
184 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)\r
185 c.push_with_producer(p)\r
186 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
187 s.join()\r
188\r
189 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])\r
190\r
191 def test_string_producer(self):\r
192 s, event = start_echo_server()\r
193 c = echo_client('\n', s.port)\r
194 data = "hello world\nI'm not dead yet!\n"\r
195 c.push_with_producer(data+SERVER_QUIT)\r
196 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
197 s.join()\r
198\r
199 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])\r
200\r
201 def test_empty_line(self):\r
202 # checks that empty lines are handled correctly\r
203 s, event = start_echo_server()\r
204 c = echo_client('\n', s.port)\r
205 c.push("hello world\n\nI'm not dead yet!\n")\r
206 c.push(SERVER_QUIT)\r
207 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
208 s.join()\r
209\r
210 self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])\r
211\r
212 def test_close_when_done(self):\r
213 s, event = start_echo_server()\r
214 s.start_resend_event = threading.Event()\r
215 c = echo_client('\n', s.port)\r
216 c.push("hello world\nI'm not dead yet!\n")\r
217 c.push(SERVER_QUIT)\r
218 c.close_when_done()\r
219 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)\r
220\r
221 # Only allow the server to start echoing data back to the client after\r
222 # the client has closed its connection. This prevents a race condition\r
223 # where the server echoes all of its data before we can check that it\r
224 # got any down below.\r
225 s.start_resend_event.set()\r
226 s.join()\r
227\r
228 self.assertEqual(c.contents, [])\r
229 # the server might have been able to send a byte or two back, but this\r
230 # at least checks that it received something and didn't just fail\r
231 # (which could still result in the client not having received anything)\r
232 self.assertTrue(len(s.buffer) > 0)\r
233\r
234\r
235class TestAsynchat_WithPoll(TestAsynchat):\r
236 usepoll = True\r
237\r
238class TestHelperFunctions(unittest.TestCase):\r
239 def test_find_prefix_at_end(self):\r
240 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)\r
241 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)\r
242\r
243class TestFifo(unittest.TestCase):\r
244 def test_basic(self):\r
245 f = asynchat.fifo()\r
246 f.push(7)\r
247 f.push('a')\r
248 self.assertEqual(len(f), 2)\r
249 self.assertEqual(f.first(), 7)\r
250 self.assertEqual(f.pop(), (1, 7))\r
251 self.assertEqual(len(f), 1)\r
252 self.assertEqual(f.first(), 'a')\r
253 self.assertEqual(f.is_empty(), False)\r
254 self.assertEqual(f.pop(), (1, 'a'))\r
255 self.assertEqual(len(f), 0)\r
256 self.assertEqual(f.is_empty(), True)\r
257 self.assertEqual(f.pop(), (0, None))\r
258\r
259 def test_given_list(self):\r
260 f = asynchat.fifo(['x', 17, 3])\r
261 self.assertEqual(len(f), 3)\r
262 self.assertEqual(f.pop(), (1, 'x'))\r
263 self.assertEqual(f.pop(), (1, 17))\r
264 self.assertEqual(f.pop(), (1, 3))\r
265 self.assertEqual(f.pop(), (0, None))\r
266\r
267\r
268def test_main(verbose=None):\r
269 test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,\r
270 TestHelperFunctions, TestFifo)\r
271\r
272if __name__ == "__main__":\r
273 test_main(verbose=True)\r