]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_asyncore.py
AppPkg/Applications/Python: Add Python 2.7.2 sources since the release of Python...
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_asyncore.py
1 import asyncore
2 import unittest
3 import select
4 import os
5 import socket
6 import sys
7 import time
8 import warnings
9 import errno
10
11 from test import test_support
12 from test.test_support import TESTFN, run_unittest, unlink
13 from StringIO import StringIO
14
15 try:
16 import threading
17 except ImportError:
18 threading = None
19
20 HOST = test_support.HOST
21
22 class dummysocket:
23 def __init__(self):
24 self.closed = False
25
26 def close(self):
27 self.closed = True
28
29 def fileno(self):
30 return 42
31
32 class dummychannel:
33 def __init__(self):
34 self.socket = dummysocket()
35
36 def close(self):
37 self.socket.close()
38
39 class exitingdummy:
40 def __init__(self):
41 pass
42
43 def handle_read_event(self):
44 raise asyncore.ExitNow()
45
46 handle_write_event = handle_read_event
47 handle_close = handle_read_event
48 handle_expt_event = handle_read_event
49
50 class crashingdummy:
51 def __init__(self):
52 self.error_handled = False
53
54 def handle_read_event(self):
55 raise Exception()
56
57 handle_write_event = handle_read_event
58 handle_close = handle_read_event
59 handle_expt_event = handle_read_event
60
61 def handle_error(self):
62 self.error_handled = True
63
64 # used when testing senders; just collects what it gets until newline is sent
65 def capture_server(evt, buf, serv):
66 try:
67 serv.listen(5)
68 conn, addr = serv.accept()
69 except socket.timeout:
70 pass
71 else:
72 n = 200
73 while n > 0:
74 r, w, e = select.select([conn], [], [])
75 if r:
76 data = conn.recv(10)
77 # keep everything except for the newline terminator
78 buf.write(data.replace('\n', ''))
79 if '\n' in data:
80 break
81 n -= 1
82 time.sleep(0.01)
83
84 conn.close()
85 finally:
86 serv.close()
87 evt.set()
88
89
90 class HelperFunctionTests(unittest.TestCase):
91 def test_readwriteexc(self):
92 # Check exception handling behavior of read, write and _exception
93
94 # check that ExitNow exceptions in the object handler method
95 # bubbles all the way up through asyncore read/write/_exception calls
96 tr1 = exitingdummy()
97 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
98 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
99 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
100
101 # check that an exception other than ExitNow in the object handler
102 # method causes the handle_error method to get called
103 tr2 = crashingdummy()
104 asyncore.read(tr2)
105 self.assertEqual(tr2.error_handled, True)
106
107 tr2 = crashingdummy()
108 asyncore.write(tr2)
109 self.assertEqual(tr2.error_handled, True)
110
111 tr2 = crashingdummy()
112 asyncore._exception(tr2)
113 self.assertEqual(tr2.error_handled, True)
114
115 # asyncore.readwrite uses constants in the select module that
116 # are not present in Windows systems (see this thread:
117 # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
118 # These constants should be present as long as poll is available
119
120 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
121 def test_readwrite(self):
122 # Check that correct methods are called by readwrite()
123
124 attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
125
126 expected = (
127 (select.POLLIN, 'read'),
128 (select.POLLPRI, 'expt'),
129 (select.POLLOUT, 'write'),
130 (select.POLLERR, 'closed'),
131 (select.POLLHUP, 'closed'),
132 (select.POLLNVAL, 'closed'),
133 )
134
135 class testobj:
136 def __init__(self):
137 self.read = False
138 self.write = False
139 self.closed = False
140 self.expt = False
141 self.error_handled = False
142
143 def handle_read_event(self):
144 self.read = True
145
146 def handle_write_event(self):
147 self.write = True
148
149 def handle_close(self):
150 self.closed = True
151
152 def handle_expt_event(self):
153 self.expt = True
154
155 def handle_error(self):
156 self.error_handled = True
157
158 for flag, expectedattr in expected:
159 tobj = testobj()
160 self.assertEqual(getattr(tobj, expectedattr), False)
161 asyncore.readwrite(tobj, flag)
162
163 # Only the attribute modified by the routine we expect to be
164 # called should be True.
165 for attr in attributes:
166 self.assertEqual(getattr(tobj, attr), attr==expectedattr)
167
168 # check that ExitNow exceptions in the object handler method
169 # bubbles all the way up through asyncore readwrite call
170 tr1 = exitingdummy()
171 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
172
173 # check that an exception other than ExitNow in the object handler
174 # method causes the handle_error method to get called
175 tr2 = crashingdummy()
176 self.assertEqual(tr2.error_handled, False)
177 asyncore.readwrite(tr2, flag)
178 self.assertEqual(tr2.error_handled, True)
179
180 def test_closeall(self):
181 self.closeall_check(False)
182
183 def test_closeall_default(self):
184 self.closeall_check(True)
185
186 def closeall_check(self, usedefault):
187 # Check that close_all() closes everything in a given map
188
189 l = []
190 testmap = {}
191 for i in range(10):
192 c = dummychannel()
193 l.append(c)
194 self.assertEqual(c.socket.closed, False)
195 testmap[i] = c
196
197 if usedefault:
198 socketmap = asyncore.socket_map
199 try:
200 asyncore.socket_map = testmap
201 asyncore.close_all()
202 finally:
203 testmap, asyncore.socket_map = asyncore.socket_map, socketmap
204 else:
205 asyncore.close_all(testmap)
206
207 self.assertEqual(len(testmap), 0)
208
209 for c in l:
210 self.assertEqual(c.socket.closed, True)
211
212 def test_compact_traceback(self):
213 try:
214 raise Exception("I don't like spam!")
215 except:
216 real_t, real_v, real_tb = sys.exc_info()
217 r = asyncore.compact_traceback()
218 else:
219 self.fail("Expected exception")
220
221 (f, function, line), t, v, info = r
222 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
223 self.assertEqual(function, 'test_compact_traceback')
224 self.assertEqual(t, real_t)
225 self.assertEqual(v, real_v)
226 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
227
228
229 class DispatcherTests(unittest.TestCase):
230 def setUp(self):
231 pass
232
233 def tearDown(self):
234 asyncore.close_all()
235
236 def test_basic(self):
237 d = asyncore.dispatcher()
238 self.assertEqual(d.readable(), True)
239 self.assertEqual(d.writable(), True)
240
241 def test_repr(self):
242 d = asyncore.dispatcher()
243 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
244
245 def test_log(self):
246 d = asyncore.dispatcher()
247
248 # capture output of dispatcher.log() (to stderr)
249 fp = StringIO()
250 stderr = sys.stderr
251 l1 = "Lovely spam! Wonderful spam!"
252 l2 = "I don't like spam!"
253 try:
254 sys.stderr = fp
255 d.log(l1)
256 d.log(l2)
257 finally:
258 sys.stderr = stderr
259
260 lines = fp.getvalue().splitlines()
261 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
262
263 def test_log_info(self):
264 d = asyncore.dispatcher()
265
266 # capture output of dispatcher.log_info() (to stdout via print)
267 fp = StringIO()
268 stdout = sys.stdout
269 l1 = "Have you got anything without spam?"
270 l2 = "Why can't she have egg bacon spam and sausage?"
271 l3 = "THAT'S got spam in it!"
272 try:
273 sys.stdout = fp
274 d.log_info(l1, 'EGGS')
275 d.log_info(l2)
276 d.log_info(l3, 'SPAM')
277 finally:
278 sys.stdout = stdout
279
280 lines = fp.getvalue().splitlines()
281 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
282
283 self.assertEqual(lines, expected)
284
285 def test_unhandled(self):
286 d = asyncore.dispatcher()
287 d.ignore_log_types = ()
288
289 # capture output of dispatcher.log_info() (to stdout via print)
290 fp = StringIO()
291 stdout = sys.stdout
292 try:
293 sys.stdout = fp
294 d.handle_expt()
295 d.handle_read()
296 d.handle_write()
297 d.handle_connect()
298 d.handle_accept()
299 finally:
300 sys.stdout = stdout
301
302 lines = fp.getvalue().splitlines()
303 expected = ['warning: unhandled incoming priority event',
304 'warning: unhandled read event',
305 'warning: unhandled write event',
306 'warning: unhandled connect event',
307 'warning: unhandled accept event']
308 self.assertEqual(lines, expected)
309
310 def test_issue_8594(self):
311 # XXX - this test is supposed to be removed in next major Python
312 # version
313 d = asyncore.dispatcher(socket.socket())
314 # make sure the error message no longer refers to the socket
315 # object but the dispatcher instance instead
316 self.assertRaisesRegexp(AttributeError, 'dispatcher instance',
317 getattr, d, 'foo')
318 # cheap inheritance with the underlying socket is supposed
319 # to still work but a DeprecationWarning is expected
320 with warnings.catch_warnings(record=True) as w:
321 warnings.simplefilter("always")
322 family = d.family
323 self.assertEqual(family, socket.AF_INET)
324 self.assertEqual(len(w), 1)
325 self.assertTrue(issubclass(w[0].category, DeprecationWarning))
326
327 def test_strerror(self):
328 # refers to bug #8573
329 err = asyncore._strerror(errno.EPERM)
330 if hasattr(os, 'strerror'):
331 self.assertEqual(err, os.strerror(errno.EPERM))
332 err = asyncore._strerror(-1)
333 self.assertTrue(err != "")
334
335
336 class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
337 def readable(self):
338 return False
339
340 def handle_connect(self):
341 pass
342
343 class DispatcherWithSendTests(unittest.TestCase):
344 usepoll = False
345
346 def setUp(self):
347 pass
348
349 def tearDown(self):
350 asyncore.close_all()
351
352 @unittest.skipUnless(threading, 'Threading required for this test.')
353 @test_support.reap_threads
354 def test_send(self):
355 evt = threading.Event()
356 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
357 sock.settimeout(3)
358 port = test_support.bind_port(sock)
359
360 cap = StringIO()
361 args = (evt, cap, sock)
362 t = threading.Thread(target=capture_server, args=args)
363 t.start()
364 try:
365 # wait a little longer for the server to initialize (it sometimes
366 # refuses connections on slow machines without this wait)
367 time.sleep(0.2)
368
369 data = "Suppose there isn't a 16-ton weight?"
370 d = dispatcherwithsend_noread()
371 d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
372 d.connect((HOST, port))
373
374 # give time for socket to connect
375 time.sleep(0.1)
376
377 d.send(data)
378 d.send(data)
379 d.send('\n')
380
381 n = 1000
382 while d.out_buffer and n > 0:
383 asyncore.poll()
384 n -= 1
385
386 evt.wait()
387
388 self.assertEqual(cap.getvalue(), data*2)
389 finally:
390 t.join()
391
392
393 class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
394 usepoll = True
395
396 @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
397 'asyncore.file_wrapper required')
398 class FileWrapperTest(unittest.TestCase):
399 def setUp(self):
400 self.d = "It's not dead, it's sleeping!"
401 with file(TESTFN, 'w') as h:
402 h.write(self.d)
403
404 def tearDown(self):
405 unlink(TESTFN)
406
407 def test_recv(self):
408 fd = os.open(TESTFN, os.O_RDONLY)
409 w = asyncore.file_wrapper(fd)
410 os.close(fd)
411
412 self.assertNotEqual(w.fd, fd)
413 self.assertNotEqual(w.fileno(), fd)
414 self.assertEqual(w.recv(13), "It's not dead")
415 self.assertEqual(w.read(6), ", it's")
416 w.close()
417 self.assertRaises(OSError, w.read, 1)
418
419
420 def test_send(self):
421 d1 = "Come again?"
422 d2 = "I want to buy some cheese."
423 fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
424 w = asyncore.file_wrapper(fd)
425 os.close(fd)
426
427 w.write(d1)
428 w.send(d2)
429 w.close()
430 self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
431
432 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
433 'asyncore.file_dispatcher required')
434 def test_dispatcher(self):
435 fd = os.open(TESTFN, os.O_RDONLY)
436 data = []
437 class FileDispatcher(asyncore.file_dispatcher):
438 def handle_read(self):
439 data.append(self.recv(29))
440 s = FileDispatcher(fd)
441 os.close(fd)
442 asyncore.loop(timeout=0.01, use_poll=True, count=2)
443 self.assertEqual(b"".join(data), self.d)
444
445
446 class BaseTestHandler(asyncore.dispatcher):
447
448 def __init__(self, sock=None):
449 asyncore.dispatcher.__init__(self, sock)
450 self.flag = False
451
452 def handle_accept(self):
453 raise Exception("handle_accept not supposed to be called")
454
455 def handle_connect(self):
456 raise Exception("handle_connect not supposed to be called")
457
458 def handle_expt(self):
459 raise Exception("handle_expt not supposed to be called")
460
461 def handle_close(self):
462 raise Exception("handle_close not supposed to be called")
463
464 def handle_error(self):
465 raise
466
467
468 class TCPServer(asyncore.dispatcher):
469 """A server which listens on an address and dispatches the
470 connection to a handler.
471 """
472
473 def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
474 asyncore.dispatcher.__init__(self)
475 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
476 self.set_reuse_addr()
477 self.bind((host, port))
478 self.listen(5)
479 self.handler = handler
480
481 @property
482 def address(self):
483 return self.socket.getsockname()[:2]
484
485 def handle_accept(self):
486 sock, addr = self.accept()
487 self.handler(sock)
488
489 def handle_error(self):
490 raise
491
492
493 class BaseClient(BaseTestHandler):
494
495 def __init__(self, address):
496 BaseTestHandler.__init__(self)
497 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
498 self.connect(address)
499
500 def handle_connect(self):
501 pass
502
503
504 class BaseTestAPI(unittest.TestCase):
505
506 def tearDown(self):
507 asyncore.close_all()
508
509 def loop_waiting_for_flag(self, instance, timeout=5):
510 timeout = float(timeout) / 100
511 count = 100
512 while asyncore.socket_map and count > 0:
513 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
514 if instance.flag:
515 return
516 count -= 1
517 time.sleep(timeout)
518 self.fail("flag not set")
519
520 def test_handle_connect(self):
521 # make sure handle_connect is called on connect()
522
523 class TestClient(BaseClient):
524 def handle_connect(self):
525 self.flag = True
526
527 server = TCPServer()
528 client = TestClient(server.address)
529 self.loop_waiting_for_flag(client)
530
531 def test_handle_accept(self):
532 # make sure handle_accept() is called when a client connects
533
534 class TestListener(BaseTestHandler):
535
536 def __init__(self):
537 BaseTestHandler.__init__(self)
538 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
539 self.bind((HOST, 0))
540 self.listen(5)
541 self.address = self.socket.getsockname()[:2]
542
543 def handle_accept(self):
544 self.flag = True
545
546 server = TestListener()
547 client = BaseClient(server.address)
548 self.loop_waiting_for_flag(server)
549
550 def test_handle_read(self):
551 # make sure handle_read is called on data received
552
553 class TestClient(BaseClient):
554 def handle_read(self):
555 self.flag = True
556
557 class TestHandler(BaseTestHandler):
558 def __init__(self, conn):
559 BaseTestHandler.__init__(self, conn)
560 self.send('x' * 1024)
561
562 server = TCPServer(TestHandler)
563 client = TestClient(server.address)
564 self.loop_waiting_for_flag(client)
565
566 def test_handle_write(self):
567 # make sure handle_write is called
568
569 class TestClient(BaseClient):
570 def handle_write(self):
571 self.flag = True
572
573 server = TCPServer()
574 client = TestClient(server.address)
575 self.loop_waiting_for_flag(client)
576
577 def test_handle_close(self):
578 # make sure handle_close is called when the other end closes
579 # the connection
580
581 class TestClient(BaseClient):
582
583 def handle_read(self):
584 # in order to make handle_close be called we are supposed
585 # to make at least one recv() call
586 self.recv(1024)
587
588 def handle_close(self):
589 self.flag = True
590 self.close()
591
592 class TestHandler(BaseTestHandler):
593 def __init__(self, conn):
594 BaseTestHandler.__init__(self, conn)
595 self.close()
596
597 server = TCPServer(TestHandler)
598 client = TestClient(server.address)
599 self.loop_waiting_for_flag(client)
600
601 @unittest.skipIf(sys.platform.startswith("sunos"),
602 "OOB support is broken on Solaris")
603 def test_handle_expt(self):
604 # Make sure handle_expt is called on OOB data received.
605 # Note: this might fail on some platforms as OOB data is
606 # tenuously supported and rarely used.
607
608 class TestClient(BaseClient):
609 def handle_expt(self):
610 self.flag = True
611
612 class TestHandler(BaseTestHandler):
613 def __init__(self, conn):
614 BaseTestHandler.__init__(self, conn)
615 self.socket.send(chr(244), socket.MSG_OOB)
616
617 server = TCPServer(TestHandler)
618 client = TestClient(server.address)
619 self.loop_waiting_for_flag(client)
620
621 def test_handle_error(self):
622
623 class TestClient(BaseClient):
624 def handle_write(self):
625 1.0 / 0
626 def handle_error(self):
627 self.flag = True
628 try:
629 raise
630 except ZeroDivisionError:
631 pass
632 else:
633 raise Exception("exception not raised")
634
635 server = TCPServer()
636 client = TestClient(server.address)
637 self.loop_waiting_for_flag(client)
638
639 def test_connection_attributes(self):
640 server = TCPServer()
641 client = BaseClient(server.address)
642
643 # we start disconnected
644 self.assertFalse(server.connected)
645 self.assertTrue(server.accepting)
646 # this can't be taken for granted across all platforms
647 #self.assertFalse(client.connected)
648 self.assertFalse(client.accepting)
649
650 # execute some loops so that client connects to server
651 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
652 self.assertFalse(server.connected)
653 self.assertTrue(server.accepting)
654 self.assertTrue(client.connected)
655 self.assertFalse(client.accepting)
656
657 # disconnect the client
658 client.close()
659 self.assertFalse(server.connected)
660 self.assertTrue(server.accepting)
661 self.assertFalse(client.connected)
662 self.assertFalse(client.accepting)
663
664 # stop serving
665 server.close()
666 self.assertFalse(server.connected)
667 self.assertFalse(server.accepting)
668
669 def test_create_socket(self):
670 s = asyncore.dispatcher()
671 s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
672 self.assertEqual(s.socket.family, socket.AF_INET)
673 self.assertEqual(s.socket.type, socket.SOCK_STREAM)
674
675 def test_bind(self):
676 s1 = asyncore.dispatcher()
677 s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
678 s1.bind((HOST, 0))
679 s1.listen(5)
680 port = s1.socket.getsockname()[1]
681
682 s2 = asyncore.dispatcher()
683 s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
684 # EADDRINUSE indicates the socket was correctly bound
685 self.assertRaises(socket.error, s2.bind, (HOST, port))
686
687 def test_set_reuse_addr(self):
688 sock = socket.socket()
689 try:
690 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
691 except socket.error:
692 unittest.skip("SO_REUSEADDR not supported on this platform")
693 else:
694 # if SO_REUSEADDR succeeded for sock we expect asyncore
695 # to do the same
696 s = asyncore.dispatcher(socket.socket())
697 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
698 socket.SO_REUSEADDR))
699 s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
700 s.set_reuse_addr()
701 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
702 socket.SO_REUSEADDR))
703 finally:
704 sock.close()
705
706
707 class TestAPI_UseSelect(BaseTestAPI):
708 use_poll = False
709
710 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
711 class TestAPI_UsePoll(BaseTestAPI):
712 use_poll = True
713
714
715 def test_main():
716 tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
717 DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,
718 TestAPI_UsePoll, FileWrapperTest]
719 run_unittest(*tests)
720
721 if __name__ == "__main__":
722 test_main()