]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_asyncore.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_asyncore.py
CommitLineData
4710c53d 1import asyncore\r
2import unittest\r
3import select\r
4import os\r
5import socket\r
6import sys\r
7import time\r
8import warnings\r
9import errno\r
10\r
11from test import test_support\r
12from test.test_support import TESTFN, run_unittest, unlink\r
13from StringIO import StringIO\r
14\r
15try:\r
16 import threading\r
17except ImportError:\r
18 threading = None\r
19\r
20HOST = test_support.HOST\r
21\r
22class dummysocket:\r
23 def __init__(self):\r
24 self.closed = False\r
25\r
26 def close(self):\r
27 self.closed = True\r
28\r
29 def fileno(self):\r
30 return 42\r
31\r
32class dummychannel:\r
33 def __init__(self):\r
34 self.socket = dummysocket()\r
35\r
36 def close(self):\r
37 self.socket.close()\r
38\r
39class exitingdummy:\r
40 def __init__(self):\r
41 pass\r
42\r
43 def handle_read_event(self):\r
44 raise asyncore.ExitNow()\r
45\r
46 handle_write_event = handle_read_event\r
47 handle_close = handle_read_event\r
48 handle_expt_event = handle_read_event\r
49\r
50class crashingdummy:\r
51 def __init__(self):\r
52 self.error_handled = False\r
53\r
54 def handle_read_event(self):\r
55 raise Exception()\r
56\r
57 handle_write_event = handle_read_event\r
58 handle_close = handle_read_event\r
59 handle_expt_event = handle_read_event\r
60\r
61 def handle_error(self):\r
62 self.error_handled = True\r
63\r
64# used when testing senders; just collects what it gets until newline is sent\r
65def capture_server(evt, buf, serv):\r
66 try:\r
67 serv.listen(5)\r
68 conn, addr = serv.accept()\r
69 except socket.timeout:\r
70 pass\r
71 else:\r
72 n = 200\r
73 while n > 0:\r
74 r, w, e = select.select([conn], [], [])\r
75 if r:\r
76 data = conn.recv(10)\r
77 # keep everything except for the newline terminator\r
78 buf.write(data.replace('\n', ''))\r
79 if '\n' in data:\r
80 break\r
81 n -= 1\r
82 time.sleep(0.01)\r
83\r
84 conn.close()\r
85 finally:\r
86 serv.close()\r
87 evt.set()\r
88\r
89\r
90class HelperFunctionTests(unittest.TestCase):\r
91 def test_readwriteexc(self):\r
92 # Check exception handling behavior of read, write and _exception\r
93\r
94 # check that ExitNow exceptions in the object handler method\r
95 # bubbles all the way up through asyncore read/write/_exception calls\r
96 tr1 = exitingdummy()\r
97 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)\r
98 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)\r
99 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)\r
100\r
101 # check that an exception other than ExitNow in the object handler\r
102 # method causes the handle_error method to get called\r
103 tr2 = crashingdummy()\r
104 asyncore.read(tr2)\r
105 self.assertEqual(tr2.error_handled, True)\r
106\r
107 tr2 = crashingdummy()\r
108 asyncore.write(tr2)\r
109 self.assertEqual(tr2.error_handled, True)\r
110\r
111 tr2 = crashingdummy()\r
112 asyncore._exception(tr2)\r
113 self.assertEqual(tr2.error_handled, True)\r
114\r
115 # asyncore.readwrite uses constants in the select module that\r
116 # are not present in Windows systems (see this thread:\r
117 # http://mail.python.org/pipermail/python-list/2001-October/109973.html)\r
118 # These constants should be present as long as poll is available\r
119\r
120 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')\r
121 def test_readwrite(self):\r
122 # Check that correct methods are called by readwrite()\r
123\r
124 attributes = ('read', 'expt', 'write', 'closed', 'error_handled')\r
125\r
126 expected = (\r
127 (select.POLLIN, 'read'),\r
128 (select.POLLPRI, 'expt'),\r
129 (select.POLLOUT, 'write'),\r
130 (select.POLLERR, 'closed'),\r
131 (select.POLLHUP, 'closed'),\r
132 (select.POLLNVAL, 'closed'),\r
133 )\r
134\r
135 class testobj:\r
136 def __init__(self):\r
137 self.read = False\r
138 self.write = False\r
139 self.closed = False\r
140 self.expt = False\r
141 self.error_handled = False\r
142\r
143 def handle_read_event(self):\r
144 self.read = True\r
145\r
146 def handle_write_event(self):\r
147 self.write = True\r
148\r
149 def handle_close(self):\r
150 self.closed = True\r
151\r
152 def handle_expt_event(self):\r
153 self.expt = True\r
154\r
155 def handle_error(self):\r
156 self.error_handled = True\r
157\r
158 for flag, expectedattr in expected:\r
159 tobj = testobj()\r
160 self.assertEqual(getattr(tobj, expectedattr), False)\r
161 asyncore.readwrite(tobj, flag)\r
162\r
163 # Only the attribute modified by the routine we expect to be\r
164 # called should be True.\r
165 for attr in attributes:\r
166 self.assertEqual(getattr(tobj, attr), attr==expectedattr)\r
167\r
168 # check that ExitNow exceptions in the object handler method\r
169 # bubbles all the way up through asyncore readwrite call\r
170 tr1 = exitingdummy()\r
171 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)\r
172\r
173 # check that an exception other than ExitNow in the object handler\r
174 # method causes the handle_error method to get called\r
175 tr2 = crashingdummy()\r
176 self.assertEqual(tr2.error_handled, False)\r
177 asyncore.readwrite(tr2, flag)\r
178 self.assertEqual(tr2.error_handled, True)\r
179\r
180 def test_closeall(self):\r
181 self.closeall_check(False)\r
182\r
183 def test_closeall_default(self):\r
184 self.closeall_check(True)\r
185\r
186 def closeall_check(self, usedefault):\r
187 # Check that close_all() closes everything in a given map\r
188\r
189 l = []\r
190 testmap = {}\r
191 for i in range(10):\r
192 c = dummychannel()\r
193 l.append(c)\r
194 self.assertEqual(c.socket.closed, False)\r
195 testmap[i] = c\r
196\r
197 if usedefault:\r
198 socketmap = asyncore.socket_map\r
199 try:\r
200 asyncore.socket_map = testmap\r
201 asyncore.close_all()\r
202 finally:\r
203 testmap, asyncore.socket_map = asyncore.socket_map, socketmap\r
204 else:\r
205 asyncore.close_all(testmap)\r
206\r
207 self.assertEqual(len(testmap), 0)\r
208\r
209 for c in l:\r
210 self.assertEqual(c.socket.closed, True)\r
211\r
212 def test_compact_traceback(self):\r
213 try:\r
214 raise Exception("I don't like spam!")\r
215 except:\r
216 real_t, real_v, real_tb = sys.exc_info()\r
217 r = asyncore.compact_traceback()\r
218 else:\r
219 self.fail("Expected exception")\r
220\r
221 (f, function, line), t, v, info = r\r
222 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')\r
223 self.assertEqual(function, 'test_compact_traceback')\r
224 self.assertEqual(t, real_t)\r
225 self.assertEqual(v, real_v)\r
226 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))\r
227\r
228\r
229class DispatcherTests(unittest.TestCase):\r
230 def setUp(self):\r
231 pass\r
232\r
233 def tearDown(self):\r
234 asyncore.close_all()\r
235\r
236 def test_basic(self):\r
237 d = asyncore.dispatcher()\r
238 self.assertEqual(d.readable(), True)\r
239 self.assertEqual(d.writable(), True)\r
240\r
241 def test_repr(self):\r
242 d = asyncore.dispatcher()\r
243 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))\r
244\r
245 def test_log(self):\r
246 d = asyncore.dispatcher()\r
247\r
248 # capture output of dispatcher.log() (to stderr)\r
249 fp = StringIO()\r
250 stderr = sys.stderr\r
251 l1 = "Lovely spam! Wonderful spam!"\r
252 l2 = "I don't like spam!"\r
253 try:\r
254 sys.stderr = fp\r
255 d.log(l1)\r
256 d.log(l2)\r
257 finally:\r
258 sys.stderr = stderr\r
259\r
260 lines = fp.getvalue().splitlines()\r
261 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])\r
262\r
263 def test_log_info(self):\r
264 d = asyncore.dispatcher()\r
265\r
266 # capture output of dispatcher.log_info() (to stdout via print)\r
267 fp = StringIO()\r
268 stdout = sys.stdout\r
269 l1 = "Have you got anything without spam?"\r
270 l2 = "Why can't she have egg bacon spam and sausage?"\r
271 l3 = "THAT'S got spam in it!"\r
272 try:\r
273 sys.stdout = fp\r
274 d.log_info(l1, 'EGGS')\r
275 d.log_info(l2)\r
276 d.log_info(l3, 'SPAM')\r
277 finally:\r
278 sys.stdout = stdout\r
279\r
280 lines = fp.getvalue().splitlines()\r
281 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]\r
282\r
283 self.assertEqual(lines, expected)\r
284\r
285 def test_unhandled(self):\r
286 d = asyncore.dispatcher()\r
287 d.ignore_log_types = ()\r
288\r
289 # capture output of dispatcher.log_info() (to stdout via print)\r
290 fp = StringIO()\r
291 stdout = sys.stdout\r
292 try:\r
293 sys.stdout = fp\r
294 d.handle_expt()\r
295 d.handle_read()\r
296 d.handle_write()\r
297 d.handle_connect()\r
298 d.handle_accept()\r
299 finally:\r
300 sys.stdout = stdout\r
301\r
302 lines = fp.getvalue().splitlines()\r
303 expected = ['warning: unhandled incoming priority event',\r
304 'warning: unhandled read event',\r
305 'warning: unhandled write event',\r
306 'warning: unhandled connect event',\r
307 'warning: unhandled accept event']\r
308 self.assertEqual(lines, expected)\r
309\r
310 def test_issue_8594(self):\r
311 # XXX - this test is supposed to be removed in next major Python\r
312 # version\r
313 d = asyncore.dispatcher(socket.socket())\r
314 # make sure the error message no longer refers to the socket\r
315 # object but the dispatcher instance instead\r
316 self.assertRaisesRegexp(AttributeError, 'dispatcher instance',\r
317 getattr, d, 'foo')\r
318 # cheap inheritance with the underlying socket is supposed\r
319 # to still work but a DeprecationWarning is expected\r
320 with warnings.catch_warnings(record=True) as w:\r
321 warnings.simplefilter("always")\r
322 family = d.family\r
323 self.assertEqual(family, socket.AF_INET)\r
324 self.assertEqual(len(w), 1)\r
325 self.assertTrue(issubclass(w[0].category, DeprecationWarning))\r
326\r
327 def test_strerror(self):\r
328 # refers to bug #8573\r
329 err = asyncore._strerror(errno.EPERM)\r
330 if hasattr(os, 'strerror'):\r
331 self.assertEqual(err, os.strerror(errno.EPERM))\r
332 err = asyncore._strerror(-1)\r
333 self.assertTrue(err != "")\r
334\r
335\r
336class dispatcherwithsend_noread(asyncore.dispatcher_with_send):\r
337 def readable(self):\r
338 return False\r
339\r
340 def handle_connect(self):\r
341 pass\r
342\r
343class DispatcherWithSendTests(unittest.TestCase):\r
344 usepoll = False\r
345\r
346 def setUp(self):\r
347 pass\r
348\r
349 def tearDown(self):\r
350 asyncore.close_all()\r
351\r
352 @unittest.skipUnless(threading, 'Threading required for this test.')\r
353 @test_support.reap_threads\r
354 def test_send(self):\r
355 evt = threading.Event()\r
356 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
357 sock.settimeout(3)\r
358 port = test_support.bind_port(sock)\r
359\r
360 cap = StringIO()\r
361 args = (evt, cap, sock)\r
362 t = threading.Thread(target=capture_server, args=args)\r
363 t.start()\r
364 try:\r
365 # wait a little longer for the server to initialize (it sometimes\r
366 # refuses connections on slow machines without this wait)\r
367 time.sleep(0.2)\r
368\r
369 data = "Suppose there isn't a 16-ton weight?"\r
370 d = dispatcherwithsend_noread()\r
371 d.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
372 d.connect((HOST, port))\r
373\r
374 # give time for socket to connect\r
375 time.sleep(0.1)\r
376\r
377 d.send(data)\r
378 d.send(data)\r
379 d.send('\n')\r
380\r
381 n = 1000\r
382 while d.out_buffer and n > 0:\r
383 asyncore.poll()\r
384 n -= 1\r
385\r
386 evt.wait()\r
387\r
388 self.assertEqual(cap.getvalue(), data*2)\r
389 finally:\r
390 t.join()\r
391\r
392\r
393class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):\r
394 usepoll = True\r
395\r
396@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),\r
397 'asyncore.file_wrapper required')\r
398class FileWrapperTest(unittest.TestCase):\r
399 def setUp(self):\r
400 self.d = "It's not dead, it's sleeping!"\r
401 with file(TESTFN, 'w') as h:\r
402 h.write(self.d)\r
403\r
404 def tearDown(self):\r
405 unlink(TESTFN)\r
406\r
407 def test_recv(self):\r
408 fd = os.open(TESTFN, os.O_RDONLY)\r
409 w = asyncore.file_wrapper(fd)\r
410 os.close(fd)\r
411\r
412 self.assertNotEqual(w.fd, fd)\r
413 self.assertNotEqual(w.fileno(), fd)\r
414 self.assertEqual(w.recv(13), "It's not dead")\r
415 self.assertEqual(w.read(6), ", it's")\r
416 w.close()\r
417 self.assertRaises(OSError, w.read, 1)\r
418\r
419\r
420 def test_send(self):\r
421 d1 = "Come again?"\r
422 d2 = "I want to buy some cheese."\r
423 fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)\r
424 w = asyncore.file_wrapper(fd)\r
425 os.close(fd)\r
426\r
427 w.write(d1)\r
428 w.send(d2)\r
429 w.close()\r
430 self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)\r
431\r
432 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),\r
433 'asyncore.file_dispatcher required')\r
434 def test_dispatcher(self):\r
435 fd = os.open(TESTFN, os.O_RDONLY)\r
436 data = []\r
437 class FileDispatcher(asyncore.file_dispatcher):\r
438 def handle_read(self):\r
439 data.append(self.recv(29))\r
440 s = FileDispatcher(fd)\r
441 os.close(fd)\r
442 asyncore.loop(timeout=0.01, use_poll=True, count=2)\r
443 self.assertEqual(b"".join(data), self.d)\r
444\r
445\r
446class BaseTestHandler(asyncore.dispatcher):\r
447\r
448 def __init__(self, sock=None):\r
449 asyncore.dispatcher.__init__(self, sock)\r
450 self.flag = False\r
451\r
452 def handle_accept(self):\r
453 raise Exception("handle_accept not supposed to be called")\r
454\r
455 def handle_connect(self):\r
456 raise Exception("handle_connect not supposed to be called")\r
457\r
458 def handle_expt(self):\r
459 raise Exception("handle_expt not supposed to be called")\r
460\r
461 def handle_close(self):\r
462 raise Exception("handle_close not supposed to be called")\r
463\r
464 def handle_error(self):\r
465 raise\r
466\r
467\r
468class TCPServer(asyncore.dispatcher):\r
469 """A server which listens on an address and dispatches the\r
470 connection to a handler.\r
471 """\r
472\r
473 def __init__(self, handler=BaseTestHandler, host=HOST, port=0):\r
474 asyncore.dispatcher.__init__(self)\r
475 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
476 self.set_reuse_addr()\r
477 self.bind((host, port))\r
478 self.listen(5)\r
479 self.handler = handler\r
480\r
481 @property\r
482 def address(self):\r
483 return self.socket.getsockname()[:2]\r
484\r
485 def handle_accept(self):\r
486 sock, addr = self.accept()\r
487 self.handler(sock)\r
488\r
489 def handle_error(self):\r
490 raise\r
491\r
492\r
493class BaseClient(BaseTestHandler):\r
494\r
495 def __init__(self, address):\r
496 BaseTestHandler.__init__(self)\r
497 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
498 self.connect(address)\r
499\r
500 def handle_connect(self):\r
501 pass\r
502\r
503\r
504class BaseTestAPI(unittest.TestCase):\r
505\r
506 def tearDown(self):\r
507 asyncore.close_all()\r
508\r
509 def loop_waiting_for_flag(self, instance, timeout=5):\r
510 timeout = float(timeout) / 100\r
511 count = 100\r
512 while asyncore.socket_map and count > 0:\r
513 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)\r
514 if instance.flag:\r
515 return\r
516 count -= 1\r
517 time.sleep(timeout)\r
518 self.fail("flag not set")\r
519\r
520 def test_handle_connect(self):\r
521 # make sure handle_connect is called on connect()\r
522\r
523 class TestClient(BaseClient):\r
524 def handle_connect(self):\r
525 self.flag = True\r
526\r
527 server = TCPServer()\r
528 client = TestClient(server.address)\r
529 self.loop_waiting_for_flag(client)\r
530\r
531 def test_handle_accept(self):\r
532 # make sure handle_accept() is called when a client connects\r
533\r
534 class TestListener(BaseTestHandler):\r
535\r
536 def __init__(self):\r
537 BaseTestHandler.__init__(self)\r
538 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
539 self.bind((HOST, 0))\r
540 self.listen(5)\r
541 self.address = self.socket.getsockname()[:2]\r
542\r
543 def handle_accept(self):\r
544 self.flag = True\r
545\r
546 server = TestListener()\r
547 client = BaseClient(server.address)\r
548 self.loop_waiting_for_flag(server)\r
549\r
550 def test_handle_read(self):\r
551 # make sure handle_read is called on data received\r
552\r
553 class TestClient(BaseClient):\r
554 def handle_read(self):\r
555 self.flag = True\r
556\r
557 class TestHandler(BaseTestHandler):\r
558 def __init__(self, conn):\r
559 BaseTestHandler.__init__(self, conn)\r
560 self.send('x' * 1024)\r
561\r
562 server = TCPServer(TestHandler)\r
563 client = TestClient(server.address)\r
564 self.loop_waiting_for_flag(client)\r
565\r
566 def test_handle_write(self):\r
567 # make sure handle_write is called\r
568\r
569 class TestClient(BaseClient):\r
570 def handle_write(self):\r
571 self.flag = True\r
572\r
573 server = TCPServer()\r
574 client = TestClient(server.address)\r
575 self.loop_waiting_for_flag(client)\r
576\r
577 def test_handle_close(self):\r
578 # make sure handle_close is called when the other end closes\r
579 # the connection\r
580\r
581 class TestClient(BaseClient):\r
582\r
583 def handle_read(self):\r
584 # in order to make handle_close be called we are supposed\r
585 # to make at least one recv() call\r
586 self.recv(1024)\r
587\r
588 def handle_close(self):\r
589 self.flag = True\r
590 self.close()\r
591\r
592 class TestHandler(BaseTestHandler):\r
593 def __init__(self, conn):\r
594 BaseTestHandler.__init__(self, conn)\r
595 self.close()\r
596\r
597 server = TCPServer(TestHandler)\r
598 client = TestClient(server.address)\r
599 self.loop_waiting_for_flag(client)\r
600\r
601 @unittest.skipIf(sys.platform.startswith("sunos"),\r
602 "OOB support is broken on Solaris")\r
603 def test_handle_expt(self):\r
604 # Make sure handle_expt is called on OOB data received.\r
605 # Note: this might fail on some platforms as OOB data is\r
606 # tenuously supported and rarely used.\r
607\r
608 class TestClient(BaseClient):\r
609 def handle_expt(self):\r
610 self.flag = True\r
611\r
612 class TestHandler(BaseTestHandler):\r
613 def __init__(self, conn):\r
614 BaseTestHandler.__init__(self, conn)\r
615 self.socket.send(chr(244), socket.MSG_OOB)\r
616\r
617 server = TCPServer(TestHandler)\r
618 client = TestClient(server.address)\r
619 self.loop_waiting_for_flag(client)\r
620\r
621 def test_handle_error(self):\r
622\r
623 class TestClient(BaseClient):\r
624 def handle_write(self):\r
625 1.0 / 0\r
626 def handle_error(self):\r
627 self.flag = True\r
628 try:\r
629 raise\r
630 except ZeroDivisionError:\r
631 pass\r
632 else:\r
633 raise Exception("exception not raised")\r
634\r
635 server = TCPServer()\r
636 client = TestClient(server.address)\r
637 self.loop_waiting_for_flag(client)\r
638\r
639 def test_connection_attributes(self):\r
640 server = TCPServer()\r
641 client = BaseClient(server.address)\r
642\r
643 # we start disconnected\r
644 self.assertFalse(server.connected)\r
645 self.assertTrue(server.accepting)\r
646 # this can't be taken for granted across all platforms\r
647 #self.assertFalse(client.connected)\r
648 self.assertFalse(client.accepting)\r
649\r
650 # execute some loops so that client connects to server\r
651 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)\r
652 self.assertFalse(server.connected)\r
653 self.assertTrue(server.accepting)\r
654 self.assertTrue(client.connected)\r
655 self.assertFalse(client.accepting)\r
656\r
657 # disconnect the client\r
658 client.close()\r
659 self.assertFalse(server.connected)\r
660 self.assertTrue(server.accepting)\r
661 self.assertFalse(client.connected)\r
662 self.assertFalse(client.accepting)\r
663\r
664 # stop serving\r
665 server.close()\r
666 self.assertFalse(server.connected)\r
667 self.assertFalse(server.accepting)\r
668\r
669 def test_create_socket(self):\r
670 s = asyncore.dispatcher()\r
671 s.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
672 self.assertEqual(s.socket.family, socket.AF_INET)\r
673 self.assertEqual(s.socket.type, socket.SOCK_STREAM)\r
674\r
675 def test_bind(self):\r
676 s1 = asyncore.dispatcher()\r
677 s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
678 s1.bind((HOST, 0))\r
679 s1.listen(5)\r
680 port = s1.socket.getsockname()[1]\r
681\r
682 s2 = asyncore.dispatcher()\r
683 s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
684 # EADDRINUSE indicates the socket was correctly bound\r
685 self.assertRaises(socket.error, s2.bind, (HOST, port))\r
686\r
687 def test_set_reuse_addr(self):\r
688 sock = socket.socket()\r
689 try:\r
690 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
691 except socket.error:\r
692 unittest.skip("SO_REUSEADDR not supported on this platform")\r
693 else:\r
694 # if SO_REUSEADDR succeeded for sock we expect asyncore\r
695 # to do the same\r
696 s = asyncore.dispatcher(socket.socket())\r
697 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,\r
698 socket.SO_REUSEADDR))\r
699 s.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
700 s.set_reuse_addr()\r
701 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,\r
702 socket.SO_REUSEADDR))\r
703 finally:\r
704 sock.close()\r
705\r
706\r
707class TestAPI_UseSelect(BaseTestAPI):\r
708 use_poll = False\r
709\r
710@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')\r
711class TestAPI_UsePoll(BaseTestAPI):\r
712 use_poll = True\r
713\r
714\r
715def test_main():\r
716 tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,\r
717 DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,\r
718 TestAPI_UsePoll, FileWrapperTest]\r
719 run_unittest(*tests)\r
720\r
721if __name__ == "__main__":\r
722 test_main()\r