]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_socket.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_socket.py
CommitLineData
4710c53d 1#!/usr/bin/env python\r
2\r
3import unittest\r
4from test import test_support\r
5\r
6import errno\r
7import socket\r
8import select\r
9import time\r
10import traceback\r
11import Queue\r
12import sys\r
13import os\r
14import array\r
15import contextlib\r
16from weakref import proxy\r
17import signal\r
18import math\r
19\r
20def try_address(host, port=0, family=socket.AF_INET):\r
21 """Try to bind a socket on the given host:port and return True\r
22 if that has been possible."""\r
23 try:\r
24 sock = socket.socket(family, socket.SOCK_STREAM)\r
25 sock.bind((host, port))\r
26 except (socket.error, socket.gaierror):\r
27 return False\r
28 else:\r
29 sock.close()\r
30 return True\r
31\r
32HOST = test_support.HOST\r
33MSG = b'Michael Gilfix was here\n'\r
34SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)\r
35\r
36try:\r
37 import thread\r
38 import threading\r
39except ImportError:\r
40 thread = None\r
41 threading = None\r
42\r
43HOST = test_support.HOST\r
44MSG = 'Michael Gilfix was here\n'\r
45\r
46class SocketTCPTest(unittest.TestCase):\r
47\r
48 def setUp(self):\r
49 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
50 self.port = test_support.bind_port(self.serv)\r
51 self.serv.listen(1)\r
52\r
53 def tearDown(self):\r
54 self.serv.close()\r
55 self.serv = None\r
56\r
57class SocketUDPTest(unittest.TestCase):\r
58\r
59 def setUp(self):\r
60 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
61 self.port = test_support.bind_port(self.serv)\r
62\r
63 def tearDown(self):\r
64 self.serv.close()\r
65 self.serv = None\r
66\r
67class ThreadableTest:\r
68 """Threadable Test class\r
69\r
70 The ThreadableTest class makes it easy to create a threaded\r
71 client/server pair from an existing unit test. To create a\r
72 new threaded class from an existing unit test, use multiple\r
73 inheritance:\r
74\r
75 class NewClass (OldClass, ThreadableTest):\r
76 pass\r
77\r
78 This class defines two new fixture functions with obvious\r
79 purposes for overriding:\r
80\r
81 clientSetUp ()\r
82 clientTearDown ()\r
83\r
84 Any new test functions within the class must then define\r
85 tests in pairs, where the test name is preceeded with a\r
86 '_' to indicate the client portion of the test. Ex:\r
87\r
88 def testFoo(self):\r
89 # Server portion\r
90\r
91 def _testFoo(self):\r
92 # Client portion\r
93\r
94 Any exceptions raised by the clients during their tests\r
95 are caught and transferred to the main thread to alert\r
96 the testing framework.\r
97\r
98 Note, the server setup function cannot call any blocking\r
99 functions that rely on the client thread during setup,\r
100 unless serverExplicitReady() is called just before\r
101 the blocking call (such as in setting up a client/server\r
102 connection and performing the accept() in setUp().\r
103 """\r
104\r
105 def __init__(self):\r
106 # Swap the true setup function\r
107 self.__setUp = self.setUp\r
108 self.__tearDown = self.tearDown\r
109 self.setUp = self._setUp\r
110 self.tearDown = self._tearDown\r
111\r
112 def serverExplicitReady(self):\r
113 """This method allows the server to explicitly indicate that\r
114 it wants the client thread to proceed. This is useful if the\r
115 server is about to execute a blocking routine that is\r
116 dependent upon the client thread during its setup routine."""\r
117 self.server_ready.set()\r
118\r
119 def _setUp(self):\r
120 self.server_ready = threading.Event()\r
121 self.client_ready = threading.Event()\r
122 self.done = threading.Event()\r
123 self.queue = Queue.Queue(1)\r
124\r
125 # Do some munging to start the client test.\r
126 methodname = self.id()\r
127 i = methodname.rfind('.')\r
128 methodname = methodname[i+1:]\r
129 test_method = getattr(self, '_' + methodname)\r
130 self.client_thread = thread.start_new_thread(\r
131 self.clientRun, (test_method,))\r
132\r
133 self.__setUp()\r
134 if not self.server_ready.is_set():\r
135 self.server_ready.set()\r
136 self.client_ready.wait()\r
137\r
138 def _tearDown(self):\r
139 self.__tearDown()\r
140 self.done.wait()\r
141\r
142 if not self.queue.empty():\r
143 msg = self.queue.get()\r
144 self.fail(msg)\r
145\r
146 def clientRun(self, test_func):\r
147 self.server_ready.wait()\r
148 self.client_ready.set()\r
149 self.clientSetUp()\r
150 with test_support.check_py3k_warnings():\r
151 if not callable(test_func):\r
152 raise TypeError("test_func must be a callable function.")\r
153 try:\r
154 test_func()\r
155 except Exception, strerror:\r
156 self.queue.put(strerror)\r
157 self.clientTearDown()\r
158\r
159 def clientSetUp(self):\r
160 raise NotImplementedError("clientSetUp must be implemented.")\r
161\r
162 def clientTearDown(self):\r
163 self.done.set()\r
164 thread.exit()\r
165\r
166class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):\r
167\r
168 def __init__(self, methodName='runTest'):\r
169 SocketTCPTest.__init__(self, methodName=methodName)\r
170 ThreadableTest.__init__(self)\r
171\r
172 def clientSetUp(self):\r
173 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
174\r
175 def clientTearDown(self):\r
176 self.cli.close()\r
177 self.cli = None\r
178 ThreadableTest.clientTearDown(self)\r
179\r
180class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):\r
181\r
182 def __init__(self, methodName='runTest'):\r
183 SocketUDPTest.__init__(self, methodName=methodName)\r
184 ThreadableTest.__init__(self)\r
185\r
186 def clientSetUp(self):\r
187 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
188\r
189 def clientTearDown(self):\r
190 self.cli.close()\r
191 self.cli = None\r
192 ThreadableTest.clientTearDown(self)\r
193\r
194class SocketConnectedTest(ThreadedTCPSocketTest):\r
195\r
196 def __init__(self, methodName='runTest'):\r
197 ThreadedTCPSocketTest.__init__(self, methodName=methodName)\r
198\r
199 def setUp(self):\r
200 ThreadedTCPSocketTest.setUp(self)\r
201 # Indicate explicitly we're ready for the client thread to\r
202 # proceed and then perform the blocking call to accept\r
203 self.serverExplicitReady()\r
204 conn, addr = self.serv.accept()\r
205 self.cli_conn = conn\r
206\r
207 def tearDown(self):\r
208 self.cli_conn.close()\r
209 self.cli_conn = None\r
210 ThreadedTCPSocketTest.tearDown(self)\r
211\r
212 def clientSetUp(self):\r
213 ThreadedTCPSocketTest.clientSetUp(self)\r
214 self.cli.connect((HOST, self.port))\r
215 self.serv_conn = self.cli\r
216\r
217 def clientTearDown(self):\r
218 self.serv_conn.close()\r
219 self.serv_conn = None\r
220 ThreadedTCPSocketTest.clientTearDown(self)\r
221\r
222class SocketPairTest(unittest.TestCase, ThreadableTest):\r
223\r
224 def __init__(self, methodName='runTest'):\r
225 unittest.TestCase.__init__(self, methodName=methodName)\r
226 ThreadableTest.__init__(self)\r
227\r
228 def setUp(self):\r
229 self.serv, self.cli = socket.socketpair()\r
230\r
231 def tearDown(self):\r
232 self.serv.close()\r
233 self.serv = None\r
234\r
235 def clientSetUp(self):\r
236 pass\r
237\r
238 def clientTearDown(self):\r
239 self.cli.close()\r
240 self.cli = None\r
241 ThreadableTest.clientTearDown(self)\r
242\r
243\r
244#######################################################################\r
245## Begin Tests\r
246\r
247class GeneralModuleTests(unittest.TestCase):\r
248\r
249 def test_weakref(self):\r
250 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
251 p = proxy(s)\r
252 self.assertEqual(p.fileno(), s.fileno())\r
253 s.close()\r
254 s = None\r
255 try:\r
256 p.fileno()\r
257 except ReferenceError:\r
258 pass\r
259 else:\r
260 self.fail('Socket proxy still exists')\r
261\r
262 def testSocketError(self):\r
263 # Testing socket module exceptions\r
264 def raise_error(*args, **kwargs):\r
265 raise socket.error\r
266 def raise_herror(*args, **kwargs):\r
267 raise socket.herror\r
268 def raise_gaierror(*args, **kwargs):\r
269 raise socket.gaierror\r
270 self.assertRaises(socket.error, raise_error,\r
271 "Error raising socket exception.")\r
272 self.assertRaises(socket.error, raise_herror,\r
273 "Error raising socket exception.")\r
274 self.assertRaises(socket.error, raise_gaierror,\r
275 "Error raising socket exception.")\r
276\r
277 def testSendtoErrors(self):\r
278 # Testing that sendto doens't masks failures. See #10169.\r
279 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
280 self.addCleanup(s.close)\r
281 s.bind(('', 0))\r
282 sockname = s.getsockname()\r
283 # 2 args\r
284 with self.assertRaises(UnicodeEncodeError):\r
285 s.sendto(u'\u2620', sockname)\r
286 with self.assertRaises(TypeError) as cm:\r
287 s.sendto(5j, sockname)\r
288 self.assertIn('not complex', str(cm.exception))\r
289 with self.assertRaises(TypeError) as cm:\r
290 s.sendto('foo', None)\r
291 self.assertIn('not NoneType', str(cm.exception))\r
292 # 3 args\r
293 with self.assertRaises(UnicodeEncodeError):\r
294 s.sendto(u'\u2620', 0, sockname)\r
295 with self.assertRaises(TypeError) as cm:\r
296 s.sendto(5j, 0, sockname)\r
297 self.assertIn('not complex', str(cm.exception))\r
298 with self.assertRaises(TypeError) as cm:\r
299 s.sendto('foo', 0, None)\r
300 self.assertIn('not NoneType', str(cm.exception))\r
301 with self.assertRaises(TypeError) as cm:\r
302 s.sendto('foo', 'bar', sockname)\r
303 self.assertIn('an integer is required', str(cm.exception))\r
304 with self.assertRaises(TypeError) as cm:\r
305 s.sendto('foo', None, None)\r
306 self.assertIn('an integer is required', str(cm.exception))\r
307 # wrong number of args\r
308 with self.assertRaises(TypeError) as cm:\r
309 s.sendto('foo')\r
310 self.assertIn('(1 given)', str(cm.exception))\r
311 with self.assertRaises(TypeError) as cm:\r
312 s.sendto('foo', 0, sockname, 4)\r
313 self.assertIn('(4 given)', str(cm.exception))\r
314\r
315\r
316 def testCrucialConstants(self):\r
317 # Testing for mission critical constants\r
318 socket.AF_INET\r
319 socket.SOCK_STREAM\r
320 socket.SOCK_DGRAM\r
321 socket.SOCK_RAW\r
322 socket.SOCK_RDM\r
323 socket.SOCK_SEQPACKET\r
324 socket.SOL_SOCKET\r
325 socket.SO_REUSEADDR\r
326\r
327 def testHostnameRes(self):\r
328 # Testing hostname resolution mechanisms\r
329 hostname = socket.gethostname()\r
330 try:\r
331 ip = socket.gethostbyname(hostname)\r
332 except socket.error:\r
333 # Probably name lookup wasn't set up right; skip this test\r
334 return\r
335 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")\r
336 try:\r
337 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)\r
338 except socket.error:\r
339 # Probably a similar problem as above; skip this test\r
340 return\r
341 all_host_names = [hostname, hname] + aliases\r
342 fqhn = socket.getfqdn(ip)\r
343 if not fqhn in all_host_names:\r
344 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))\r
345\r
346 def testRefCountGetNameInfo(self):\r
347 # Testing reference count for getnameinfo\r
348 if hasattr(sys, "getrefcount"):\r
349 try:\r
350 # On some versions, this loses a reference\r
351 orig = sys.getrefcount(__name__)\r
352 socket.getnameinfo(__name__,0)\r
353 except TypeError:\r
354 self.assertEqual(sys.getrefcount(__name__), orig,\r
355 "socket.getnameinfo loses a reference")\r
356\r
357 def testInterpreterCrash(self):\r
358 # Making sure getnameinfo doesn't crash the interpreter\r
359 try:\r
360 # On some versions, this crashes the interpreter.\r
361 socket.getnameinfo(('x', 0, 0, 0), 0)\r
362 except socket.error:\r
363 pass\r
364\r
365 def testNtoH(self):\r
366 # This just checks that htons etc. are their own inverse,\r
367 # when looking at the lower 16 or 32 bits.\r
368 sizes = {socket.htonl: 32, socket.ntohl: 32,\r
369 socket.htons: 16, socket.ntohs: 16}\r
370 for func, size in sizes.items():\r
371 mask = (1L<<size) - 1\r
372 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):\r
373 self.assertEqual(i & mask, func(func(i&mask)) & mask)\r
374\r
375 swapped = func(mask)\r
376 self.assertEqual(swapped & mask, mask)\r
377 self.assertRaises(OverflowError, func, 1L<<34)\r
378\r
379 def testNtoHErrors(self):\r
380 good_values = [ 1, 2, 3, 1L, 2L, 3L ]\r
381 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]\r
382 for k in good_values:\r
383 socket.ntohl(k)\r
384 socket.ntohs(k)\r
385 socket.htonl(k)\r
386 socket.htons(k)\r
387 for k in bad_values:\r
388 self.assertRaises(OverflowError, socket.ntohl, k)\r
389 self.assertRaises(OverflowError, socket.ntohs, k)\r
390 self.assertRaises(OverflowError, socket.htonl, k)\r
391 self.assertRaises(OverflowError, socket.htons, k)\r
392\r
393 def testGetServBy(self):\r
394 eq = self.assertEqual\r
395 # Find one service that exists, then check all the related interfaces.\r
396 # I've ordered this by protocols that have both a tcp and udp\r
397 # protocol, at least for modern Linuxes.\r
398 if (sys.platform.startswith('linux') or\r
399 sys.platform.startswith('freebsd') or\r
400 sys.platform.startswith('netbsd') or\r
401 sys.platform == 'darwin'):\r
402 # avoid the 'echo' service on this platform, as there is an\r
403 # assumption breaking non-standard port/protocol entry\r
404 services = ('daytime', 'qotd', 'domain')\r
405 else:\r
406 services = ('echo', 'daytime', 'domain')\r
407 for service in services:\r
408 try:\r
409 port = socket.getservbyname(service, 'tcp')\r
410 break\r
411 except socket.error:\r
412 pass\r
413 else:\r
414 raise socket.error\r
415 # Try same call with optional protocol omitted\r
416 port2 = socket.getservbyname(service)\r
417 eq(port, port2)\r
418 # Try udp, but don't barf it it doesn't exist\r
419 try:\r
420 udpport = socket.getservbyname(service, 'udp')\r
421 except socket.error:\r
422 udpport = None\r
423 else:\r
424 eq(udpport, port)\r
425 # Now make sure the lookup by port returns the same service name\r
426 eq(socket.getservbyport(port2), service)\r
427 eq(socket.getservbyport(port, 'tcp'), service)\r
428 if udpport is not None:\r
429 eq(socket.getservbyport(udpport, 'udp'), service)\r
430 # Make sure getservbyport does not accept out of range ports.\r
431 self.assertRaises(OverflowError, socket.getservbyport, -1)\r
432 self.assertRaises(OverflowError, socket.getservbyport, 65536)\r
433\r
434 def testDefaultTimeout(self):\r
435 # Testing default timeout\r
436 # The default timeout should initially be None\r
437 self.assertEqual(socket.getdefaulttimeout(), None)\r
438 s = socket.socket()\r
439 self.assertEqual(s.gettimeout(), None)\r
440 s.close()\r
441\r
442 # Set the default timeout to 10, and see if it propagates\r
443 socket.setdefaulttimeout(10)\r
444 self.assertEqual(socket.getdefaulttimeout(), 10)\r
445 s = socket.socket()\r
446 self.assertEqual(s.gettimeout(), 10)\r
447 s.close()\r
448\r
449 # Reset the default timeout to None, and see if it propagates\r
450 socket.setdefaulttimeout(None)\r
451 self.assertEqual(socket.getdefaulttimeout(), None)\r
452 s = socket.socket()\r
453 self.assertEqual(s.gettimeout(), None)\r
454 s.close()\r
455\r
456 # Check that setting it to an invalid value raises ValueError\r
457 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)\r
458\r
459 # Check that setting it to an invalid type raises TypeError\r
460 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")\r
461\r
462 def testIPv4_inet_aton_fourbytes(self):\r
463 if not hasattr(socket, 'inet_aton'):\r
464 return # No inet_aton, nothing to check\r
465 # Test that issue1008086 and issue767150 are fixed.\r
466 # It must return 4 bytes.\r
467 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))\r
468 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))\r
469\r
470 def testIPv4toString(self):\r
471 if not hasattr(socket, 'inet_pton'):\r
472 return # No inet_pton() on this platform\r
473 from socket import inet_aton as f, inet_pton, AF_INET\r
474 g = lambda a: inet_pton(AF_INET, a)\r
475\r
476 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))\r
477 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))\r
478 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))\r
479 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))\r
480 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))\r
481\r
482 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))\r
483 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))\r
484 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))\r
485 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))\r
486\r
487 def testIPv6toString(self):\r
488 if not hasattr(socket, 'inet_pton'):\r
489 return # No inet_pton() on this platform\r
490 try:\r
491 from socket import inet_pton, AF_INET6, has_ipv6\r
492 if not has_ipv6:\r
493 return\r
494 except ImportError:\r
495 return\r
496 f = lambda a: inet_pton(AF_INET6, a)\r
497\r
498 self.assertEqual('\x00' * 16, f('::'))\r
499 self.assertEqual('\x00' * 16, f('0::0'))\r
500 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))\r
501 self.assertEqual(\r
502 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',\r
503 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')\r
504 )\r
505\r
506 def testStringToIPv4(self):\r
507 if not hasattr(socket, 'inet_ntop'):\r
508 return # No inet_ntop() on this platform\r
509 from socket import inet_ntoa as f, inet_ntop, AF_INET\r
510 g = lambda a: inet_ntop(AF_INET, a)\r
511\r
512 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))\r
513 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))\r
514 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))\r
515 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))\r
516\r
517 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))\r
518 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))\r
519 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))\r
520\r
521 def testStringToIPv6(self):\r
522 if not hasattr(socket, 'inet_ntop'):\r
523 return # No inet_ntop() on this platform\r
524 try:\r
525 from socket import inet_ntop, AF_INET6, has_ipv6\r
526 if not has_ipv6:\r
527 return\r
528 except ImportError:\r
529 return\r
530 f = lambda a: inet_ntop(AF_INET6, a)\r
531\r
532 self.assertEqual('::', f('\x00' * 16))\r
533 self.assertEqual('::1', f('\x00' * 15 + '\x01'))\r
534 self.assertEqual(\r
535 'aef:b01:506:1001:ffff:9997:55:170',\r
536 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')\r
537 )\r
538\r
539 # XXX The following don't test module-level functionality...\r
540\r
541 def _get_unused_port(self, bind_address='0.0.0.0'):\r
542 """Use a temporary socket to elicit an unused ephemeral port.\r
543\r
544 Args:\r
545 bind_address: Hostname or IP address to search for a port on.\r
546\r
547 Returns: A most likely to be unused port.\r
548 """\r
549 tempsock = socket.socket()\r
550 tempsock.bind((bind_address, 0))\r
551 host, port = tempsock.getsockname()\r
552 tempsock.close()\r
553 return port\r
554\r
555 def testSockName(self):\r
556 # Testing getsockname()\r
557 port = self._get_unused_port()\r
558 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
559 self.addCleanup(sock.close)\r
560 sock.bind(("0.0.0.0", port))\r
561 name = sock.getsockname()\r
562 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate\r
563 # it reasonable to get the host's addr in addition to 0.0.0.0.\r
564 # At least for eCos. This is required for the S/390 to pass.\r
565 try:\r
566 my_ip_addr = socket.gethostbyname(socket.gethostname())\r
567 except socket.error:\r
568 # Probably name lookup wasn't set up right; skip this test\r
569 return\r
570 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])\r
571 self.assertEqual(name[1], port)\r
572\r
573 def testGetSockOpt(self):\r
574 # Testing getsockopt()\r
575 # We know a socket should start without reuse==0\r
576 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
577 self.addCleanup(sock.close)\r
578 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)\r
579 self.assertFalse(reuse != 0, "initial mode is reuse")\r
580\r
581 def testSetSockOpt(self):\r
582 # Testing setsockopt()\r
583 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
584 self.addCleanup(sock.close)\r
585 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
586 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)\r
587 self.assertFalse(reuse == 0, "failed to set reuse mode")\r
588\r
589 def testSendAfterClose(self):\r
590 # testing send() after close() with timeout\r
591 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
592 sock.settimeout(1)\r
593 sock.close()\r
594 self.assertRaises(socket.error, sock.send, "spam")\r
595\r
596 def testNewAttributes(self):\r
597 # testing .family, .type and .protocol\r
598 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
599 self.assertEqual(sock.family, socket.AF_INET)\r
600 self.assertEqual(sock.type, socket.SOCK_STREAM)\r
601 self.assertEqual(sock.proto, 0)\r
602 sock.close()\r
603\r
604 def test_getsockaddrarg(self):\r
605 host = '0.0.0.0'\r
606 port = self._get_unused_port(bind_address=host)\r
607 big_port = port + 65536\r
608 neg_port = port - 65536\r
609 sock = socket.socket()\r
610 try:\r
611 self.assertRaises(OverflowError, sock.bind, (host, big_port))\r
612 self.assertRaises(OverflowError, sock.bind, (host, neg_port))\r
613 sock.bind((host, port))\r
614 finally:\r
615 sock.close()\r
616\r
617 @unittest.skipUnless(os.name == "nt", "Windows specific")\r
618 def test_sock_ioctl(self):\r
619 self.assertTrue(hasattr(socket.socket, 'ioctl'))\r
620 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))\r
621 self.assertTrue(hasattr(socket, 'RCVALL_ON'))\r
622 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))\r
623 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))\r
624 s = socket.socket()\r
625 self.addCleanup(s.close)\r
626 self.assertRaises(ValueError, s.ioctl, -1, None)\r
627 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))\r
628\r
629 def testGetaddrinfo(self):\r
630 try:\r
631 socket.getaddrinfo('localhost', 80)\r
632 except socket.gaierror as err:\r
633 if err.errno == socket.EAI_SERVICE:\r
634 # see http://bugs.python.org/issue1282647\r
635 self.skipTest("buggy libc version")\r
636 raise\r
637 # len of every sequence is supposed to be == 5\r
638 for info in socket.getaddrinfo(HOST, None):\r
639 self.assertEqual(len(info), 5)\r
640 # host can be a domain name, a string representation of an\r
641 # IPv4/v6 address or None\r
642 socket.getaddrinfo('localhost', 80)\r
643 socket.getaddrinfo('127.0.0.1', 80)\r
644 socket.getaddrinfo(None, 80)\r
645 if SUPPORTS_IPV6:\r
646 socket.getaddrinfo('::1', 80)\r
647 # port can be a string service name such as "http", a numeric\r
648 # port number or None\r
649 socket.getaddrinfo(HOST, "http")\r
650 socket.getaddrinfo(HOST, 80)\r
651 socket.getaddrinfo(HOST, None)\r
652 # test family and socktype filters\r
653 infos = socket.getaddrinfo(HOST, None, socket.AF_INET)\r
654 for family, _, _, _, _ in infos:\r
655 self.assertEqual(family, socket.AF_INET)\r
656 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)\r
657 for _, socktype, _, _, _ in infos:\r
658 self.assertEqual(socktype, socket.SOCK_STREAM)\r
659 # test proto and flags arguments\r
660 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)\r
661 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)\r
662 # a server willing to support both IPv4 and IPv6 will\r
663 # usually do this\r
664 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,\r
665 socket.AI_PASSIVE)\r
666\r
667\r
668 def check_sendall_interrupted(self, with_timeout):\r
669 # socketpair() is not stricly required, but it makes things easier.\r
670 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):\r
671 self.skipTest("signal.alarm and socket.socketpair required for this test")\r
672 # Our signal handlers clobber the C errno by calling a math function\r
673 # with an invalid domain value.\r
674 def ok_handler(*args):\r
675 self.assertRaises(ValueError, math.acosh, 0)\r
676 def raising_handler(*args):\r
677 self.assertRaises(ValueError, math.acosh, 0)\r
678 1 // 0\r
679 c, s = socket.socketpair()\r
680 old_alarm = signal.signal(signal.SIGALRM, raising_handler)\r
681 try:\r
682 if with_timeout:\r
683 # Just above the one second minimum for signal.alarm\r
684 c.settimeout(1.5)\r
685 with self.assertRaises(ZeroDivisionError):\r
686 signal.alarm(1)\r
687 c.sendall(b"x" * (1024**2))\r
688 if with_timeout:\r
689 signal.signal(signal.SIGALRM, ok_handler)\r
690 signal.alarm(1)\r
691 self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))\r
692 finally:\r
693 signal.signal(signal.SIGALRM, old_alarm)\r
694 c.close()\r
695 s.close()\r
696\r
697 def test_sendall_interrupted(self):\r
698 self.check_sendall_interrupted(False)\r
699\r
700 def test_sendall_interrupted_with_timeout(self):\r
701 self.check_sendall_interrupted(True)\r
702\r
703 def testListenBacklog0(self):\r
704 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
705 srv.bind((HOST, 0))\r
706 # backlog = 0\r
707 srv.listen(0)\r
708 srv.close()\r
709\r
710\r
711@unittest.skipUnless(thread, 'Threading required for this test.')\r
712class BasicTCPTest(SocketConnectedTest):\r
713\r
714 def __init__(self, methodName='runTest'):\r
715 SocketConnectedTest.__init__(self, methodName=methodName)\r
716\r
717 def testRecv(self):\r
718 # Testing large receive over TCP\r
719 msg = self.cli_conn.recv(1024)\r
720 self.assertEqual(msg, MSG)\r
721\r
722 def _testRecv(self):\r
723 self.serv_conn.send(MSG)\r
724\r
725 def testOverFlowRecv(self):\r
726 # Testing receive in chunks over TCP\r
727 seg1 = self.cli_conn.recv(len(MSG) - 3)\r
728 seg2 = self.cli_conn.recv(1024)\r
729 msg = seg1 + seg2\r
730 self.assertEqual(msg, MSG)\r
731\r
732 def _testOverFlowRecv(self):\r
733 self.serv_conn.send(MSG)\r
734\r
735 def testRecvFrom(self):\r
736 # Testing large recvfrom() over TCP\r
737 msg, addr = self.cli_conn.recvfrom(1024)\r
738 self.assertEqual(msg, MSG)\r
739\r
740 def _testRecvFrom(self):\r
741 self.serv_conn.send(MSG)\r
742\r
743 def testOverFlowRecvFrom(self):\r
744 # Testing recvfrom() in chunks over TCP\r
745 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)\r
746 seg2, addr = self.cli_conn.recvfrom(1024)\r
747 msg = seg1 + seg2\r
748 self.assertEqual(msg, MSG)\r
749\r
750 def _testOverFlowRecvFrom(self):\r
751 self.serv_conn.send(MSG)\r
752\r
753 def testSendAll(self):\r
754 # Testing sendall() with a 2048 byte string over TCP\r
755 msg = ''\r
756 while 1:\r
757 read = self.cli_conn.recv(1024)\r
758 if not read:\r
759 break\r
760 msg += read\r
761 self.assertEqual(msg, 'f' * 2048)\r
762\r
763 def _testSendAll(self):\r
764 big_chunk = 'f' * 2048\r
765 self.serv_conn.sendall(big_chunk)\r
766\r
767 def testFromFd(self):\r
768 # Testing fromfd()\r
769 if not hasattr(socket, "fromfd"):\r
770 return # On Windows, this doesn't exist\r
771 fd = self.cli_conn.fileno()\r
772 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)\r
773 self.addCleanup(sock.close)\r
774 msg = sock.recv(1024)\r
775 self.assertEqual(msg, MSG)\r
776\r
777 def _testFromFd(self):\r
778 self.serv_conn.send(MSG)\r
779\r
780 def testDup(self):\r
781 # Testing dup()\r
782 sock = self.cli_conn.dup()\r
783 self.addCleanup(sock.close)\r
784 msg = sock.recv(1024)\r
785 self.assertEqual(msg, MSG)\r
786\r
787 def _testDup(self):\r
788 self.serv_conn.send(MSG)\r
789\r
790 def testShutdown(self):\r
791 # Testing shutdown()\r
792 msg = self.cli_conn.recv(1024)\r
793 self.assertEqual(msg, MSG)\r
794 # wait for _testShutdown to finish: on OS X, when the server\r
795 # closes the connection the client also becomes disconnected,\r
796 # and the client's shutdown call will fail. (Issue #4397.)\r
797 self.done.wait()\r
798\r
799 def _testShutdown(self):\r
800 self.serv_conn.send(MSG)\r
801 self.serv_conn.shutdown(2)\r
802\r
803@unittest.skipUnless(thread, 'Threading required for this test.')\r
804class BasicUDPTest(ThreadedUDPSocketTest):\r
805\r
806 def __init__(self, methodName='runTest'):\r
807 ThreadedUDPSocketTest.__init__(self, methodName=methodName)\r
808\r
809 def testSendtoAndRecv(self):\r
810 # Testing sendto() and Recv() over UDP\r
811 msg = self.serv.recv(len(MSG))\r
812 self.assertEqual(msg, MSG)\r
813\r
814 def _testSendtoAndRecv(self):\r
815 self.cli.sendto(MSG, 0, (HOST, self.port))\r
816\r
817 def testRecvFrom(self):\r
818 # Testing recvfrom() over UDP\r
819 msg, addr = self.serv.recvfrom(len(MSG))\r
820 self.assertEqual(msg, MSG)\r
821\r
822 def _testRecvFrom(self):\r
823 self.cli.sendto(MSG, 0, (HOST, self.port))\r
824\r
825 def testRecvFromNegative(self):\r
826 # Negative lengths passed to recvfrom should give ValueError.\r
827 self.assertRaises(ValueError, self.serv.recvfrom, -1)\r
828\r
829 def _testRecvFromNegative(self):\r
830 self.cli.sendto(MSG, 0, (HOST, self.port))\r
831\r
832@unittest.skipUnless(thread, 'Threading required for this test.')\r
833class TCPCloserTest(ThreadedTCPSocketTest):\r
834\r
835 def testClose(self):\r
836 conn, addr = self.serv.accept()\r
837 conn.close()\r
838\r
839 sd = self.cli\r
840 read, write, err = select.select([sd], [], [], 1.0)\r
841 self.assertEqual(read, [sd])\r
842 self.assertEqual(sd.recv(1), '')\r
843\r
844 def _testClose(self):\r
845 self.cli.connect((HOST, self.port))\r
846 time.sleep(1.0)\r
847\r
848@unittest.skipUnless(thread, 'Threading required for this test.')\r
849class BasicSocketPairTest(SocketPairTest):\r
850\r
851 def __init__(self, methodName='runTest'):\r
852 SocketPairTest.__init__(self, methodName=methodName)\r
853\r
854 def testRecv(self):\r
855 msg = self.serv.recv(1024)\r
856 self.assertEqual(msg, MSG)\r
857\r
858 def _testRecv(self):\r
859 self.cli.send(MSG)\r
860\r
861 def testSend(self):\r
862 self.serv.send(MSG)\r
863\r
864 def _testSend(self):\r
865 msg = self.cli.recv(1024)\r
866 self.assertEqual(msg, MSG)\r
867\r
868@unittest.skipUnless(thread, 'Threading required for this test.')\r
869class NonBlockingTCPTests(ThreadedTCPSocketTest):\r
870\r
871 def __init__(self, methodName='runTest'):\r
872 ThreadedTCPSocketTest.__init__(self, methodName=methodName)\r
873\r
874 def testSetBlocking(self):\r
875 # Testing whether set blocking works\r
876 self.serv.setblocking(0)\r
877 start = time.time()\r
878 try:\r
879 self.serv.accept()\r
880 except socket.error:\r
881 pass\r
882 end = time.time()\r
883 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")\r
884\r
885 def _testSetBlocking(self):\r
886 pass\r
887\r
888 def testAccept(self):\r
889 # Testing non-blocking accept\r
890 self.serv.setblocking(0)\r
891 try:\r
892 conn, addr = self.serv.accept()\r
893 except socket.error:\r
894 pass\r
895 else:\r
896 self.fail("Error trying to do non-blocking accept.")\r
897 read, write, err = select.select([self.serv], [], [])\r
898 if self.serv in read:\r
899 conn, addr = self.serv.accept()\r
900 conn.close()\r
901 else:\r
902 self.fail("Error trying to do accept after select.")\r
903\r
904 def _testAccept(self):\r
905 time.sleep(0.1)\r
906 self.cli.connect((HOST, self.port))\r
907\r
908 def testConnect(self):\r
909 # Testing non-blocking connect\r
910 conn, addr = self.serv.accept()\r
911 conn.close()\r
912\r
913 def _testConnect(self):\r
914 self.cli.settimeout(10)\r
915 self.cli.connect((HOST, self.port))\r
916\r
917 def testRecv(self):\r
918 # Testing non-blocking recv\r
919 conn, addr = self.serv.accept()\r
920 conn.setblocking(0)\r
921 try:\r
922 msg = conn.recv(len(MSG))\r
923 except socket.error:\r
924 pass\r
925 else:\r
926 self.fail("Error trying to do non-blocking recv.")\r
927 read, write, err = select.select([conn], [], [])\r
928 if conn in read:\r
929 msg = conn.recv(len(MSG))\r
930 conn.close()\r
931 self.assertEqual(msg, MSG)\r
932 else:\r
933 self.fail("Error during select call to non-blocking socket.")\r
934\r
935 def _testRecv(self):\r
936 self.cli.connect((HOST, self.port))\r
937 time.sleep(0.1)\r
938 self.cli.send(MSG)\r
939\r
940@unittest.skipUnless(thread, 'Threading required for this test.')\r
941class FileObjectClassTestCase(SocketConnectedTest):\r
942\r
943 bufsize = -1 # Use default buffer size\r
944\r
945 def __init__(self, methodName='runTest'):\r
946 SocketConnectedTest.__init__(self, methodName=methodName)\r
947\r
948 def setUp(self):\r
949 SocketConnectedTest.setUp(self)\r
950 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)\r
951\r
952 def tearDown(self):\r
953 self.serv_file.close()\r
954 self.assertTrue(self.serv_file.closed)\r
955 self.serv_file = None\r
956 SocketConnectedTest.tearDown(self)\r
957\r
958 def clientSetUp(self):\r
959 SocketConnectedTest.clientSetUp(self)\r
960 self.cli_file = self.serv_conn.makefile('wb')\r
961\r
962 def clientTearDown(self):\r
963 self.cli_file.close()\r
964 self.assertTrue(self.cli_file.closed)\r
965 self.cli_file = None\r
966 SocketConnectedTest.clientTearDown(self)\r
967\r
968 def testSmallRead(self):\r
969 # Performing small file read test\r
970 first_seg = self.serv_file.read(len(MSG)-3)\r
971 second_seg = self.serv_file.read(3)\r
972 msg = first_seg + second_seg\r
973 self.assertEqual(msg, MSG)\r
974\r
975 def _testSmallRead(self):\r
976 self.cli_file.write(MSG)\r
977 self.cli_file.flush()\r
978\r
979 def testFullRead(self):\r
980 # read until EOF\r
981 msg = self.serv_file.read()\r
982 self.assertEqual(msg, MSG)\r
983\r
984 def _testFullRead(self):\r
985 self.cli_file.write(MSG)\r
986 self.cli_file.close()\r
987\r
988 def testUnbufferedRead(self):\r
989 # Performing unbuffered file read test\r
990 buf = ''\r
991 while 1:\r
992 char = self.serv_file.read(1)\r
993 if not char:\r
994 break\r
995 buf += char\r
996 self.assertEqual(buf, MSG)\r
997\r
998 def _testUnbufferedRead(self):\r
999 self.cli_file.write(MSG)\r
1000 self.cli_file.flush()\r
1001\r
1002 def testReadline(self):\r
1003 # Performing file readline test\r
1004 line = self.serv_file.readline()\r
1005 self.assertEqual(line, MSG)\r
1006\r
1007 def _testReadline(self):\r
1008 self.cli_file.write(MSG)\r
1009 self.cli_file.flush()\r
1010\r
1011 def testReadlineAfterRead(self):\r
1012 a_baloo_is = self.serv_file.read(len("A baloo is"))\r
1013 self.assertEqual("A baloo is", a_baloo_is)\r
1014 _a_bear = self.serv_file.read(len(" a bear"))\r
1015 self.assertEqual(" a bear", _a_bear)\r
1016 line = self.serv_file.readline()\r
1017 self.assertEqual("\n", line)\r
1018 line = self.serv_file.readline()\r
1019 self.assertEqual("A BALOO IS A BEAR.\n", line)\r
1020 line = self.serv_file.readline()\r
1021 self.assertEqual(MSG, line)\r
1022\r
1023 def _testReadlineAfterRead(self):\r
1024 self.cli_file.write("A baloo is a bear\n")\r
1025 self.cli_file.write("A BALOO IS A BEAR.\n")\r
1026 self.cli_file.write(MSG)\r
1027 self.cli_file.flush()\r
1028\r
1029 def testReadlineAfterReadNoNewline(self):\r
1030 end_of_ = self.serv_file.read(len("End Of "))\r
1031 self.assertEqual("End Of ", end_of_)\r
1032 line = self.serv_file.readline()\r
1033 self.assertEqual("Line", line)\r
1034\r
1035 def _testReadlineAfterReadNoNewline(self):\r
1036 self.cli_file.write("End Of Line")\r
1037\r
1038 def testClosedAttr(self):\r
1039 self.assertTrue(not self.serv_file.closed)\r
1040\r
1041 def _testClosedAttr(self):\r
1042 self.assertTrue(not self.cli_file.closed)\r
1043\r
1044\r
1045class FileObjectInterruptedTestCase(unittest.TestCase):\r
1046 """Test that the file object correctly handles EINTR internally."""\r
1047\r
1048 class MockSocket(object):\r
1049 def __init__(self, recv_funcs=()):\r
1050 # A generator that returns callables that we'll call for each\r
1051 # call to recv().\r
1052 self._recv_step = iter(recv_funcs)\r
1053\r
1054 def recv(self, size):\r
1055 return self._recv_step.next()()\r
1056\r
1057 @staticmethod\r
1058 def _raise_eintr():\r
1059 raise socket.error(errno.EINTR)\r
1060\r
1061 def _test_readline(self, size=-1, **kwargs):\r
1062 mock_sock = self.MockSocket(recv_funcs=[\r
1063 lambda : "This is the first line\nAnd the sec",\r
1064 self._raise_eintr,\r
1065 lambda : "ond line is here\n",\r
1066 lambda : "",\r
1067 ])\r
1068 fo = socket._fileobject(mock_sock, **kwargs)\r
1069 self.assertEqual(fo.readline(size), "This is the first line\n")\r
1070 self.assertEqual(fo.readline(size), "And the second line is here\n")\r
1071\r
1072 def _test_read(self, size=-1, **kwargs):\r
1073 mock_sock = self.MockSocket(recv_funcs=[\r
1074 lambda : "This is the first line\nAnd the sec",\r
1075 self._raise_eintr,\r
1076 lambda : "ond line is here\n",\r
1077 lambda : "",\r
1078 ])\r
1079 fo = socket._fileobject(mock_sock, **kwargs)\r
1080 self.assertEqual(fo.read(size), "This is the first line\n"\r
1081 "And the second line is here\n")\r
1082\r
1083 def test_default(self):\r
1084 self._test_readline()\r
1085 self._test_readline(size=100)\r
1086 self._test_read()\r
1087 self._test_read(size=100)\r
1088\r
1089 def test_with_1k_buffer(self):\r
1090 self._test_readline(bufsize=1024)\r
1091 self._test_readline(size=100, bufsize=1024)\r
1092 self._test_read(bufsize=1024)\r
1093 self._test_read(size=100, bufsize=1024)\r
1094\r
1095 def _test_readline_no_buffer(self, size=-1):\r
1096 mock_sock = self.MockSocket(recv_funcs=[\r
1097 lambda : "aa",\r
1098 lambda : "\n",\r
1099 lambda : "BB",\r
1100 self._raise_eintr,\r
1101 lambda : "bb",\r
1102 lambda : "",\r
1103 ])\r
1104 fo = socket._fileobject(mock_sock, bufsize=0)\r
1105 self.assertEqual(fo.readline(size), "aa\n")\r
1106 self.assertEqual(fo.readline(size), "BBbb")\r
1107\r
1108 def test_no_buffer(self):\r
1109 self._test_readline_no_buffer()\r
1110 self._test_readline_no_buffer(size=4)\r
1111 self._test_read(bufsize=0)\r
1112 self._test_read(size=100, bufsize=0)\r
1113\r
1114\r
1115class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
1116\r
1117 """Repeat the tests from FileObjectClassTestCase with bufsize==0.\r
1118\r
1119 In this case (and in this case only), it should be possible to\r
1120 create a file object, read a line from it, create another file\r
1121 object, read another line from it, without loss of data in the\r
1122 first file object's buffer. Note that httplib relies on this\r
1123 when reading multiple requests from the same socket."""\r
1124\r
1125 bufsize = 0 # Use unbuffered mode\r
1126\r
1127 def testUnbufferedReadline(self):\r
1128 # Read a line, create a new file object, read another line with it\r
1129 line = self.serv_file.readline() # first line\r
1130 self.assertEqual(line, "A. " + MSG) # first line\r
1131 self.serv_file = self.cli_conn.makefile('rb', 0)\r
1132 line = self.serv_file.readline() # second line\r
1133 self.assertEqual(line, "B. " + MSG) # second line\r
1134\r
1135 def _testUnbufferedReadline(self):\r
1136 self.cli_file.write("A. " + MSG)\r
1137 self.cli_file.write("B. " + MSG)\r
1138 self.cli_file.flush()\r
1139\r
1140class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
1141\r
1142 bufsize = 1 # Default-buffered for reading; line-buffered for writing\r
1143\r
1144\r
1145class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
1146\r
1147 bufsize = 2 # Exercise the buffering code\r
1148\r
1149\r
1150class NetworkConnectionTest(object):\r
1151 """Prove network connection."""\r
1152 def clientSetUp(self):\r
1153 # We're inherited below by BasicTCPTest2, which also inherits\r
1154 # BasicTCPTest, which defines self.port referenced below.\r
1155 self.cli = socket.create_connection((HOST, self.port))\r
1156 self.serv_conn = self.cli\r
1157\r
1158class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):\r
1159 """Tests that NetworkConnection does not break existing TCP functionality.\r
1160 """\r
1161\r
1162class NetworkConnectionNoServer(unittest.TestCase):\r
1163 class MockSocket(socket.socket):\r
1164 def connect(self, *args):\r
1165 raise socket.timeout('timed out')\r
1166\r
1167 @contextlib.contextmanager\r
1168 def mocked_socket_module(self):\r
1169 """Return a socket which times out on connect"""\r
1170 old_socket = socket.socket\r
1171 socket.socket = self.MockSocket\r
1172 try:\r
1173 yield\r
1174 finally:\r
1175 socket.socket = old_socket\r
1176\r
1177 def test_connect(self):\r
1178 port = test_support.find_unused_port()\r
1179 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
1180 self.addCleanup(cli.close)\r
1181 with self.assertRaises(socket.error) as cm:\r
1182 cli.connect((HOST, port))\r
1183 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)\r
1184\r
1185 def test_create_connection(self):\r
1186 # Issue #9792: errors raised by create_connection() should have\r
1187 # a proper errno attribute.\r
1188 port = test_support.find_unused_port()\r
1189 with self.assertRaises(socket.error) as cm:\r
1190 socket.create_connection((HOST, port))\r
1191 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)\r
1192\r
1193 def test_create_connection_timeout(self):\r
1194 # Issue #9792: create_connection() should not recast timeout errors\r
1195 # as generic socket errors.\r
1196 with self.mocked_socket_module():\r
1197 with self.assertRaises(socket.timeout):\r
1198 socket.create_connection((HOST, 1234))\r
1199\r
1200\r
1201@unittest.skipUnless(thread, 'Threading required for this test.')\r
1202class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):\r
1203\r
1204 def __init__(self, methodName='runTest'):\r
1205 SocketTCPTest.__init__(self, methodName=methodName)\r
1206 ThreadableTest.__init__(self)\r
1207\r
1208 def clientSetUp(self):\r
1209 self.source_port = test_support.find_unused_port()\r
1210\r
1211 def clientTearDown(self):\r
1212 self.cli.close()\r
1213 self.cli = None\r
1214 ThreadableTest.clientTearDown(self)\r
1215\r
1216 def _justAccept(self):\r
1217 conn, addr = self.serv.accept()\r
1218 conn.close()\r
1219\r
1220 testFamily = _justAccept\r
1221 def _testFamily(self):\r
1222 self.cli = socket.create_connection((HOST, self.port), timeout=30)\r
1223 self.addCleanup(self.cli.close)\r
1224 self.assertEqual(self.cli.family, 2)\r
1225\r
1226 testSourceAddress = _justAccept\r
1227 def _testSourceAddress(self):\r
1228 self.cli = socket.create_connection((HOST, self.port), timeout=30,\r
1229 source_address=('', self.source_port))\r
1230 self.addCleanup(self.cli.close)\r
1231 self.assertEqual(self.cli.getsockname()[1], self.source_port)\r
1232 # The port number being used is sufficient to show that the bind()\r
1233 # call happened.\r
1234\r
1235 testTimeoutDefault = _justAccept\r
1236 def _testTimeoutDefault(self):\r
1237 # passing no explicit timeout uses socket's global default\r
1238 self.assertTrue(socket.getdefaulttimeout() is None)\r
1239 socket.setdefaulttimeout(42)\r
1240 try:\r
1241 self.cli = socket.create_connection((HOST, self.port))\r
1242 self.addCleanup(self.cli.close)\r
1243 finally:\r
1244 socket.setdefaulttimeout(None)\r
1245 self.assertEqual(self.cli.gettimeout(), 42)\r
1246\r
1247 testTimeoutNone = _justAccept\r
1248 def _testTimeoutNone(self):\r
1249 # None timeout means the same as sock.settimeout(None)\r
1250 self.assertTrue(socket.getdefaulttimeout() is None)\r
1251 socket.setdefaulttimeout(30)\r
1252 try:\r
1253 self.cli = socket.create_connection((HOST, self.port), timeout=None)\r
1254 self.addCleanup(self.cli.close)\r
1255 finally:\r
1256 socket.setdefaulttimeout(None)\r
1257 self.assertEqual(self.cli.gettimeout(), None)\r
1258\r
1259 testTimeoutValueNamed = _justAccept\r
1260 def _testTimeoutValueNamed(self):\r
1261 self.cli = socket.create_connection((HOST, self.port), timeout=30)\r
1262 self.assertEqual(self.cli.gettimeout(), 30)\r
1263\r
1264 testTimeoutValueNonamed = _justAccept\r
1265 def _testTimeoutValueNonamed(self):\r
1266 self.cli = socket.create_connection((HOST, self.port), 30)\r
1267 self.addCleanup(self.cli.close)\r
1268 self.assertEqual(self.cli.gettimeout(), 30)\r
1269\r
1270@unittest.skipUnless(thread, 'Threading required for this test.')\r
1271class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):\r
1272\r
1273 def __init__(self, methodName='runTest'):\r
1274 SocketTCPTest.__init__(self, methodName=methodName)\r
1275 ThreadableTest.__init__(self)\r
1276\r
1277 def clientSetUp(self):\r
1278 pass\r
1279\r
1280 def clientTearDown(self):\r
1281 self.cli.close()\r
1282 self.cli = None\r
1283 ThreadableTest.clientTearDown(self)\r
1284\r
1285 def testInsideTimeout(self):\r
1286 conn, addr = self.serv.accept()\r
1287 self.addCleanup(conn.close)\r
1288 time.sleep(3)\r
1289 conn.send("done!")\r
1290 testOutsideTimeout = testInsideTimeout\r
1291\r
1292 def _testInsideTimeout(self):\r
1293 self.cli = sock = socket.create_connection((HOST, self.port))\r
1294 data = sock.recv(5)\r
1295 self.assertEqual(data, "done!")\r
1296\r
1297 def _testOutsideTimeout(self):\r
1298 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)\r
1299 self.assertRaises(socket.timeout, lambda: sock.recv(5))\r
1300\r
1301\r
1302class Urllib2FileobjectTest(unittest.TestCase):\r
1303\r
1304 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that\r
1305 # it close the socket if the close c'tor argument is true\r
1306\r
1307 def testClose(self):\r
1308 class MockSocket:\r
1309 closed = False\r
1310 def flush(self): pass\r
1311 def close(self): self.closed = True\r
1312\r
1313 # must not close unless we request it: the original use of _fileobject\r
1314 # by module socket requires that the underlying socket not be closed until\r
1315 # the _socketobject that created the _fileobject is closed\r
1316 s = MockSocket()\r
1317 f = socket._fileobject(s)\r
1318 f.close()\r
1319 self.assertTrue(not s.closed)\r
1320\r
1321 s = MockSocket()\r
1322 f = socket._fileobject(s, close=True)\r
1323 f.close()\r
1324 self.assertTrue(s.closed)\r
1325\r
1326class TCPTimeoutTest(SocketTCPTest):\r
1327\r
1328 def testTCPTimeout(self):\r
1329 def raise_timeout(*args, **kwargs):\r
1330 self.serv.settimeout(1.0)\r
1331 self.serv.accept()\r
1332 self.assertRaises(socket.timeout, raise_timeout,\r
1333 "Error generating a timeout exception (TCP)")\r
1334\r
1335 def testTimeoutZero(self):\r
1336 ok = False\r
1337 try:\r
1338 self.serv.settimeout(0.0)\r
1339 foo = self.serv.accept()\r
1340 except socket.timeout:\r
1341 self.fail("caught timeout instead of error (TCP)")\r
1342 except socket.error:\r
1343 ok = True\r
1344 except:\r
1345 self.fail("caught unexpected exception (TCP)")\r
1346 if not ok:\r
1347 self.fail("accept() returned success when we did not expect it")\r
1348\r
1349 def testInterruptedTimeout(self):\r
1350 # XXX I don't know how to do this test on MSWindows or any other\r
1351 # plaform that doesn't support signal.alarm() or os.kill(), though\r
1352 # the bug should have existed on all platforms.\r
1353 if not hasattr(signal, "alarm"):\r
1354 return # can only test on *nix\r
1355 self.serv.settimeout(5.0) # must be longer than alarm\r
1356 class Alarm(Exception):\r
1357 pass\r
1358 def alarm_handler(signal, frame):\r
1359 raise Alarm\r
1360 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)\r
1361 try:\r
1362 signal.alarm(2) # POSIX allows alarm to be up to 1 second early\r
1363 try:\r
1364 foo = self.serv.accept()\r
1365 except socket.timeout:\r
1366 self.fail("caught timeout instead of Alarm")\r
1367 except Alarm:\r
1368 pass\r
1369 except:\r
1370 self.fail("caught other exception instead of Alarm:"\r
1371 " %s(%s):\n%s" %\r
1372 (sys.exc_info()[:2] + (traceback.format_exc(),)))\r
1373 else:\r
1374 self.fail("nothing caught")\r
1375 finally:\r
1376 signal.alarm(0) # shut off alarm\r
1377 except Alarm:\r
1378 self.fail("got Alarm in wrong place")\r
1379 finally:\r
1380 # no alarm can be pending. Safe to restore old handler.\r
1381 signal.signal(signal.SIGALRM, old_alarm)\r
1382\r
1383class UDPTimeoutTest(SocketTCPTest):\r
1384\r
1385 def testUDPTimeout(self):\r
1386 def raise_timeout(*args, **kwargs):\r
1387 self.serv.settimeout(1.0)\r
1388 self.serv.recv(1024)\r
1389 self.assertRaises(socket.timeout, raise_timeout,\r
1390 "Error generating a timeout exception (UDP)")\r
1391\r
1392 def testTimeoutZero(self):\r
1393 ok = False\r
1394 try:\r
1395 self.serv.settimeout(0.0)\r
1396 foo = self.serv.recv(1024)\r
1397 except socket.timeout:\r
1398 self.fail("caught timeout instead of error (UDP)")\r
1399 except socket.error:\r
1400 ok = True\r
1401 except:\r
1402 self.fail("caught unexpected exception (UDP)")\r
1403 if not ok:\r
1404 self.fail("recv() returned success when we did not expect it")\r
1405\r
1406class TestExceptions(unittest.TestCase):\r
1407\r
1408 def testExceptionTree(self):\r
1409 self.assertTrue(issubclass(socket.error, Exception))\r
1410 self.assertTrue(issubclass(socket.herror, socket.error))\r
1411 self.assertTrue(issubclass(socket.gaierror, socket.error))\r
1412 self.assertTrue(issubclass(socket.timeout, socket.error))\r
1413\r
1414class TestLinuxAbstractNamespace(unittest.TestCase):\r
1415\r
1416 UNIX_PATH_MAX = 108\r
1417\r
1418 def testLinuxAbstractNamespace(self):\r
1419 address = "\x00python-test-hello\x00\xff"\r
1420 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
1421 s1.bind(address)\r
1422 s1.listen(1)\r
1423 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
1424 s2.connect(s1.getsockname())\r
1425 s1.accept()\r
1426 self.assertEqual(s1.getsockname(), address)\r
1427 self.assertEqual(s2.getpeername(), address)\r
1428\r
1429 def testMaxName(self):\r
1430 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)\r
1431 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
1432 s.bind(address)\r
1433 self.assertEqual(s.getsockname(), address)\r
1434\r
1435 def testNameOverflow(self):\r
1436 address = "\x00" + "h" * self.UNIX_PATH_MAX\r
1437 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
1438 self.assertRaises(socket.error, s.bind, address)\r
1439\r
1440\r
1441@unittest.skipUnless(thread, 'Threading required for this test.')\r
1442class BufferIOTest(SocketConnectedTest):\r
1443 """\r
1444 Test the buffer versions of socket.recv() and socket.send().\r
1445 """\r
1446 def __init__(self, methodName='runTest'):\r
1447 SocketConnectedTest.__init__(self, methodName=methodName)\r
1448\r
1449 def testRecvIntoArray(self):\r
1450 buf = array.array('c', ' '*1024)\r
1451 nbytes = self.cli_conn.recv_into(buf)\r
1452 self.assertEqual(nbytes, len(MSG))\r
1453 msg = buf.tostring()[:len(MSG)]\r
1454 self.assertEqual(msg, MSG)\r
1455\r
1456 def _testRecvIntoArray(self):\r
1457 with test_support.check_py3k_warnings():\r
1458 buf = buffer(MSG)\r
1459 self.serv_conn.send(buf)\r
1460\r
1461 def testRecvIntoBytearray(self):\r
1462 buf = bytearray(1024)\r
1463 nbytes = self.cli_conn.recv_into(buf)\r
1464 self.assertEqual(nbytes, len(MSG))\r
1465 msg = buf[:len(MSG)]\r
1466 self.assertEqual(msg, MSG)\r
1467\r
1468 _testRecvIntoBytearray = _testRecvIntoArray\r
1469\r
1470 def testRecvIntoMemoryview(self):\r
1471 buf = bytearray(1024)\r
1472 nbytes = self.cli_conn.recv_into(memoryview(buf))\r
1473 self.assertEqual(nbytes, len(MSG))\r
1474 msg = buf[:len(MSG)]\r
1475 self.assertEqual(msg, MSG)\r
1476\r
1477 _testRecvIntoMemoryview = _testRecvIntoArray\r
1478\r
1479 def testRecvFromIntoArray(self):\r
1480 buf = array.array('c', ' '*1024)\r
1481 nbytes, addr = self.cli_conn.recvfrom_into(buf)\r
1482 self.assertEqual(nbytes, len(MSG))\r
1483 msg = buf.tostring()[:len(MSG)]\r
1484 self.assertEqual(msg, MSG)\r
1485\r
1486 def _testRecvFromIntoArray(self):\r
1487 with test_support.check_py3k_warnings():\r
1488 buf = buffer(MSG)\r
1489 self.serv_conn.send(buf)\r
1490\r
1491 def testRecvFromIntoBytearray(self):\r
1492 buf = bytearray(1024)\r
1493 nbytes, addr = self.cli_conn.recvfrom_into(buf)\r
1494 self.assertEqual(nbytes, len(MSG))\r
1495 msg = buf[:len(MSG)]\r
1496 self.assertEqual(msg, MSG)\r
1497\r
1498 _testRecvFromIntoBytearray = _testRecvFromIntoArray\r
1499\r
1500 def testRecvFromIntoMemoryview(self):\r
1501 buf = bytearray(1024)\r
1502 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))\r
1503 self.assertEqual(nbytes, len(MSG))\r
1504 msg = buf[:len(MSG)]\r
1505 self.assertEqual(msg, MSG)\r
1506\r
1507 _testRecvFromIntoMemoryview = _testRecvFromIntoArray\r
1508\r
1509\r
1510TIPC_STYPE = 2000\r
1511TIPC_LOWER = 200\r
1512TIPC_UPPER = 210\r
1513\r
1514def isTipcAvailable():\r
1515 """Check if the TIPC module is loaded\r
1516\r
1517 The TIPC module is not loaded automatically on Ubuntu and probably\r
1518 other Linux distros.\r
1519 """\r
1520 if not hasattr(socket, "AF_TIPC"):\r
1521 return False\r
1522 if not os.path.isfile("/proc/modules"):\r
1523 return False\r
1524 with open("/proc/modules") as f:\r
1525 for line in f:\r
1526 if line.startswith("tipc "):\r
1527 return True\r
1528 if test_support.verbose:\r
1529 print "TIPC module is not loaded, please 'sudo modprobe tipc'"\r
1530 return False\r
1531\r
1532class TIPCTest (unittest.TestCase):\r
1533 def testRDM(self):\r
1534 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)\r
1535 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)\r
1536\r
1537 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
1538 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,\r
1539 TIPC_LOWER, TIPC_UPPER)\r
1540 srv.bind(srvaddr)\r
1541\r
1542 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,\r
1543 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)\r
1544 cli.sendto(MSG, sendaddr)\r
1545\r
1546 msg, recvaddr = srv.recvfrom(1024)\r
1547\r
1548 self.assertEqual(cli.getsockname(), recvaddr)\r
1549 self.assertEqual(msg, MSG)\r
1550\r
1551\r
1552class TIPCThreadableTest (unittest.TestCase, ThreadableTest):\r
1553 def __init__(self, methodName = 'runTest'):\r
1554 unittest.TestCase.__init__(self, methodName = methodName)\r
1555 ThreadableTest.__init__(self)\r
1556\r
1557 def setUp(self):\r
1558 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)\r
1559 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
1560 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,\r
1561 TIPC_LOWER, TIPC_UPPER)\r
1562 self.srv.bind(srvaddr)\r
1563 self.srv.listen(5)\r
1564 self.serverExplicitReady()\r
1565 self.conn, self.connaddr = self.srv.accept()\r
1566\r
1567 def clientSetUp(self):\r
1568 # The is a hittable race between serverExplicitReady() and the\r
1569 # accept() call; sleep a little while to avoid it, otherwise\r
1570 # we could get an exception\r
1571 time.sleep(0.1)\r
1572 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)\r
1573 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,\r
1574 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)\r
1575 self.cli.connect(addr)\r
1576 self.cliaddr = self.cli.getsockname()\r
1577\r
1578 def testStream(self):\r
1579 msg = self.conn.recv(1024)\r
1580 self.assertEqual(msg, MSG)\r
1581 self.assertEqual(self.cliaddr, self.connaddr)\r
1582\r
1583 def _testStream(self):\r
1584 self.cli.send(MSG)\r
1585 self.cli.close()\r
1586\r
1587\r
1588def test_main():\r
1589 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,\r
1590 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,\r
1591 UDPTimeoutTest ]\r
1592\r
1593 tests.extend([\r
1594 NonBlockingTCPTests,\r
1595 FileObjectClassTestCase,\r
1596 FileObjectInterruptedTestCase,\r
1597 UnbufferedFileObjectClassTestCase,\r
1598 LineBufferedFileObjectClassTestCase,\r
1599 SmallBufferedFileObjectClassTestCase,\r
1600 Urllib2FileobjectTest,\r
1601 NetworkConnectionNoServer,\r
1602 NetworkConnectionAttributesTest,\r
1603 NetworkConnectionBehaviourTest,\r
1604 ])\r
1605 if hasattr(socket, "socketpair"):\r
1606 tests.append(BasicSocketPairTest)\r
1607 if sys.platform == 'linux2':\r
1608 tests.append(TestLinuxAbstractNamespace)\r
1609 if isTipcAvailable():\r
1610 tests.append(TIPCTest)\r
1611 tests.append(TIPCThreadableTest)\r
1612\r
1613 thread_info = test_support.threading_setup()\r
1614 test_support.run_unittest(*tests)\r
1615 test_support.threading_cleanup(*thread_info)\r
1616\r
1617if __name__ == "__main__":\r
1618 test_main()\r