]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_socket.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_socket.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_socket.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_socket.py
deleted file mode 100644 (file)
index 385dff1..0000000
+++ /dev/null
@@ -1,1618 +0,0 @@
-#!/usr/bin/env python\r
-\r
-import unittest\r
-from test import test_support\r
-\r
-import errno\r
-import socket\r
-import select\r
-import time\r
-import traceback\r
-import Queue\r
-import sys\r
-import os\r
-import array\r
-import contextlib\r
-from weakref import proxy\r
-import signal\r
-import math\r
-\r
-def try_address(host, port=0, family=socket.AF_INET):\r
-    """Try to bind a socket on the given host:port and return True\r
-    if that has been possible."""\r
-    try:\r
-        sock = socket.socket(family, socket.SOCK_STREAM)\r
-        sock.bind((host, port))\r
-    except (socket.error, socket.gaierror):\r
-        return False\r
-    else:\r
-        sock.close()\r
-        return True\r
-\r
-HOST = test_support.HOST\r
-MSG = b'Michael Gilfix was here\n'\r
-SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)\r
-\r
-try:\r
-    import thread\r
-    import threading\r
-except ImportError:\r
-    thread = None\r
-    threading = None\r
-\r
-HOST = test_support.HOST\r
-MSG = 'Michael Gilfix was here\n'\r
-\r
-class SocketTCPTest(unittest.TestCase):\r
-\r
-    def setUp(self):\r
-        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.port = test_support.bind_port(self.serv)\r
-        self.serv.listen(1)\r
-\r
-    def tearDown(self):\r
-        self.serv.close()\r
-        self.serv = None\r
-\r
-class SocketUDPTest(unittest.TestCase):\r
-\r
-    def setUp(self):\r
-        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-        self.port = test_support.bind_port(self.serv)\r
-\r
-    def tearDown(self):\r
-        self.serv.close()\r
-        self.serv = None\r
-\r
-class ThreadableTest:\r
-    """Threadable Test class\r
-\r
-    The ThreadableTest class makes it easy to create a threaded\r
-    client/server pair from an existing unit test. To create a\r
-    new threaded class from an existing unit test, use multiple\r
-    inheritance:\r
-\r
-        class NewClass (OldClass, ThreadableTest):\r
-            pass\r
-\r
-    This class defines two new fixture functions with obvious\r
-    purposes for overriding:\r
-\r
-        clientSetUp ()\r
-        clientTearDown ()\r
-\r
-    Any new test functions within the class must then define\r
-    tests in pairs, where the test name is preceeded with a\r
-    '_' to indicate the client portion of the test. Ex:\r
-\r
-        def testFoo(self):\r
-            # Server portion\r
-\r
-        def _testFoo(self):\r
-            # Client portion\r
-\r
-    Any exceptions raised by the clients during their tests\r
-    are caught and transferred to the main thread to alert\r
-    the testing framework.\r
-\r
-    Note, the server setup function cannot call any blocking\r
-    functions that rely on the client thread during setup,\r
-    unless serverExplicitReady() is called just before\r
-    the blocking call (such as in setting up a client/server\r
-    connection and performing the accept() in setUp().\r
-    """\r
-\r
-    def __init__(self):\r
-        # Swap the true setup function\r
-        self.__setUp = self.setUp\r
-        self.__tearDown = self.tearDown\r
-        self.setUp = self._setUp\r
-        self.tearDown = self._tearDown\r
-\r
-    def serverExplicitReady(self):\r
-        """This method allows the server to explicitly indicate that\r
-        it wants the client thread to proceed. This is useful if the\r
-        server is about to execute a blocking routine that is\r
-        dependent upon the client thread during its setup routine."""\r
-        self.server_ready.set()\r
-\r
-    def _setUp(self):\r
-        self.server_ready = threading.Event()\r
-        self.client_ready = threading.Event()\r
-        self.done = threading.Event()\r
-        self.queue = Queue.Queue(1)\r
-\r
-        # Do some munging to start the client test.\r
-        methodname = self.id()\r
-        i = methodname.rfind('.')\r
-        methodname = methodname[i+1:]\r
-        test_method = getattr(self, '_' + methodname)\r
-        self.client_thread = thread.start_new_thread(\r
-            self.clientRun, (test_method,))\r
-\r
-        self.__setUp()\r
-        if not self.server_ready.is_set():\r
-            self.server_ready.set()\r
-        self.client_ready.wait()\r
-\r
-    def _tearDown(self):\r
-        self.__tearDown()\r
-        self.done.wait()\r
-\r
-        if not self.queue.empty():\r
-            msg = self.queue.get()\r
-            self.fail(msg)\r
-\r
-    def clientRun(self, test_func):\r
-        self.server_ready.wait()\r
-        self.client_ready.set()\r
-        self.clientSetUp()\r
-        with test_support.check_py3k_warnings():\r
-            if not callable(test_func):\r
-                raise TypeError("test_func must be a callable function.")\r
-        try:\r
-            test_func()\r
-        except Exception, strerror:\r
-            self.queue.put(strerror)\r
-        self.clientTearDown()\r
-\r
-    def clientSetUp(self):\r
-        raise NotImplementedError("clientSetUp must be implemented.")\r
-\r
-    def clientTearDown(self):\r
-        self.done.set()\r
-        thread.exit()\r
-\r
-class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketTCPTest.__init__(self, methodName=methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def clientSetUp(self):\r
-        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-\r
-    def clientTearDown(self):\r
-        self.cli.close()\r
-        self.cli = None\r
-        ThreadableTest.clientTearDown(self)\r
-\r
-class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketUDPTest.__init__(self, methodName=methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def clientSetUp(self):\r
-        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-\r
-    def clientTearDown(self):\r
-        self.cli.close()\r
-        self.cli = None\r
-        ThreadableTest.clientTearDown(self)\r
-\r
-class SocketConnectedTest(ThreadedTCPSocketTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        ThreadedTCPSocketTest.__init__(self, methodName=methodName)\r
-\r
-    def setUp(self):\r
-        ThreadedTCPSocketTest.setUp(self)\r
-        # Indicate explicitly we're ready for the client thread to\r
-        # proceed and then perform the blocking call to accept\r
-        self.serverExplicitReady()\r
-        conn, addr = self.serv.accept()\r
-        self.cli_conn = conn\r
-\r
-    def tearDown(self):\r
-        self.cli_conn.close()\r
-        self.cli_conn = None\r
-        ThreadedTCPSocketTest.tearDown(self)\r
-\r
-    def clientSetUp(self):\r
-        ThreadedTCPSocketTest.clientSetUp(self)\r
-        self.cli.connect((HOST, self.port))\r
-        self.serv_conn = self.cli\r
-\r
-    def clientTearDown(self):\r
-        self.serv_conn.close()\r
-        self.serv_conn = None\r
-        ThreadedTCPSocketTest.clientTearDown(self)\r
-\r
-class SocketPairTest(unittest.TestCase, ThreadableTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        unittest.TestCase.__init__(self, methodName=methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def setUp(self):\r
-        self.serv, self.cli = socket.socketpair()\r
-\r
-    def tearDown(self):\r
-        self.serv.close()\r
-        self.serv = None\r
-\r
-    def clientSetUp(self):\r
-        pass\r
-\r
-    def clientTearDown(self):\r
-        self.cli.close()\r
-        self.cli = None\r
-        ThreadableTest.clientTearDown(self)\r
-\r
-\r
-#######################################################################\r
-## Begin Tests\r
-\r
-class GeneralModuleTests(unittest.TestCase):\r
-\r
-    def test_weakref(self):\r
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        p = proxy(s)\r
-        self.assertEqual(p.fileno(), s.fileno())\r
-        s.close()\r
-        s = None\r
-        try:\r
-            p.fileno()\r
-        except ReferenceError:\r
-            pass\r
-        else:\r
-            self.fail('Socket proxy still exists')\r
-\r
-    def testSocketError(self):\r
-        # Testing socket module exceptions\r
-        def raise_error(*args, **kwargs):\r
-            raise socket.error\r
-        def raise_herror(*args, **kwargs):\r
-            raise socket.herror\r
-        def raise_gaierror(*args, **kwargs):\r
-            raise socket.gaierror\r
-        self.assertRaises(socket.error, raise_error,\r
-                              "Error raising socket exception.")\r
-        self.assertRaises(socket.error, raise_herror,\r
-                              "Error raising socket exception.")\r
-        self.assertRaises(socket.error, raise_gaierror,\r
-                              "Error raising socket exception.")\r
-\r
-    def testSendtoErrors(self):\r
-        # Testing that sendto doens't masks failures. See #10169.\r
-        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-        self.addCleanup(s.close)\r
-        s.bind(('', 0))\r
-        sockname = s.getsockname()\r
-        # 2 args\r
-        with self.assertRaises(UnicodeEncodeError):\r
-            s.sendto(u'\u2620', sockname)\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto(5j, sockname)\r
-        self.assertIn('not complex', str(cm.exception))\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo', None)\r
-        self.assertIn('not NoneType', str(cm.exception))\r
-        # 3 args\r
-        with self.assertRaises(UnicodeEncodeError):\r
-            s.sendto(u'\u2620', 0, sockname)\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto(5j, 0, sockname)\r
-        self.assertIn('not complex', str(cm.exception))\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo', 0, None)\r
-        self.assertIn('not NoneType', str(cm.exception))\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo', 'bar', sockname)\r
-        self.assertIn('an integer is required', str(cm.exception))\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo', None, None)\r
-        self.assertIn('an integer is required', str(cm.exception))\r
-        # wrong number of args\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo')\r
-        self.assertIn('(1 given)', str(cm.exception))\r
-        with self.assertRaises(TypeError) as cm:\r
-            s.sendto('foo', 0, sockname, 4)\r
-        self.assertIn('(4 given)', str(cm.exception))\r
-\r
-\r
-    def testCrucialConstants(self):\r
-        # Testing for mission critical constants\r
-        socket.AF_INET\r
-        socket.SOCK_STREAM\r
-        socket.SOCK_DGRAM\r
-        socket.SOCK_RAW\r
-        socket.SOCK_RDM\r
-        socket.SOCK_SEQPACKET\r
-        socket.SOL_SOCKET\r
-        socket.SO_REUSEADDR\r
-\r
-    def testHostnameRes(self):\r
-        # Testing hostname resolution mechanisms\r
-        hostname = socket.gethostname()\r
-        try:\r
-            ip = socket.gethostbyname(hostname)\r
-        except socket.error:\r
-            # Probably name lookup wasn't set up right; skip this test\r
-            return\r
-        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")\r
-        try:\r
-            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)\r
-        except socket.error:\r
-            # Probably a similar problem as above; skip this test\r
-            return\r
-        all_host_names = [hostname, hname] + aliases\r
-        fqhn = socket.getfqdn(ip)\r
-        if not fqhn in all_host_names:\r
-            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))\r
-\r
-    def testRefCountGetNameInfo(self):\r
-        # Testing reference count for getnameinfo\r
-        if hasattr(sys, "getrefcount"):\r
-            try:\r
-                # On some versions, this loses a reference\r
-                orig = sys.getrefcount(__name__)\r
-                socket.getnameinfo(__name__,0)\r
-            except TypeError:\r
-                self.assertEqual(sys.getrefcount(__name__), orig,\r
-                                 "socket.getnameinfo loses a reference")\r
-\r
-    def testInterpreterCrash(self):\r
-        # Making sure getnameinfo doesn't crash the interpreter\r
-        try:\r
-            # On some versions, this crashes the interpreter.\r
-            socket.getnameinfo(('x', 0, 0, 0), 0)\r
-        except socket.error:\r
-            pass\r
-\r
-    def testNtoH(self):\r
-        # This just checks that htons etc. are their own inverse,\r
-        # when looking at the lower 16 or 32 bits.\r
-        sizes = {socket.htonl: 32, socket.ntohl: 32,\r
-                 socket.htons: 16, socket.ntohs: 16}\r
-        for func, size in sizes.items():\r
-            mask = (1L<<size) - 1\r
-            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):\r
-                self.assertEqual(i & mask, func(func(i&mask)) & mask)\r
-\r
-            swapped = func(mask)\r
-            self.assertEqual(swapped & mask, mask)\r
-            self.assertRaises(OverflowError, func, 1L<<34)\r
-\r
-    def testNtoHErrors(self):\r
-        good_values = [ 1, 2, 3, 1L, 2L, 3L ]\r
-        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]\r
-        for k in good_values:\r
-            socket.ntohl(k)\r
-            socket.ntohs(k)\r
-            socket.htonl(k)\r
-            socket.htons(k)\r
-        for k in bad_values:\r
-            self.assertRaises(OverflowError, socket.ntohl, k)\r
-            self.assertRaises(OverflowError, socket.ntohs, k)\r
-            self.assertRaises(OverflowError, socket.htonl, k)\r
-            self.assertRaises(OverflowError, socket.htons, k)\r
-\r
-    def testGetServBy(self):\r
-        eq = self.assertEqual\r
-        # Find one service that exists, then check all the related interfaces.\r
-        # I've ordered this by protocols that have both a tcp and udp\r
-        # protocol, at least for modern Linuxes.\r
-        if (sys.platform.startswith('linux') or\r
-            sys.platform.startswith('freebsd') or\r
-            sys.platform.startswith('netbsd') or\r
-            sys.platform == 'darwin'):\r
-            # avoid the 'echo' service on this platform, as there is an\r
-            # assumption breaking non-standard port/protocol entry\r
-            services = ('daytime', 'qotd', 'domain')\r
-        else:\r
-            services = ('echo', 'daytime', 'domain')\r
-        for service in services:\r
-            try:\r
-                port = socket.getservbyname(service, 'tcp')\r
-                break\r
-            except socket.error:\r
-                pass\r
-        else:\r
-            raise socket.error\r
-        # Try same call with optional protocol omitted\r
-        port2 = socket.getservbyname(service)\r
-        eq(port, port2)\r
-        # Try udp, but don't barf it it doesn't exist\r
-        try:\r
-            udpport = socket.getservbyname(service, 'udp')\r
-        except socket.error:\r
-            udpport = None\r
-        else:\r
-            eq(udpport, port)\r
-        # Now make sure the lookup by port returns the same service name\r
-        eq(socket.getservbyport(port2), service)\r
-        eq(socket.getservbyport(port, 'tcp'), service)\r
-        if udpport is not None:\r
-            eq(socket.getservbyport(udpport, 'udp'), service)\r
-        # Make sure getservbyport does not accept out of range ports.\r
-        self.assertRaises(OverflowError, socket.getservbyport, -1)\r
-        self.assertRaises(OverflowError, socket.getservbyport, 65536)\r
-\r
-    def testDefaultTimeout(self):\r
-        # Testing default timeout\r
-        # The default timeout should initially be None\r
-        self.assertEqual(socket.getdefaulttimeout(), None)\r
-        s = socket.socket()\r
-        self.assertEqual(s.gettimeout(), None)\r
-        s.close()\r
-\r
-        # Set the default timeout to 10, and see if it propagates\r
-        socket.setdefaulttimeout(10)\r
-        self.assertEqual(socket.getdefaulttimeout(), 10)\r
-        s = socket.socket()\r
-        self.assertEqual(s.gettimeout(), 10)\r
-        s.close()\r
-\r
-        # Reset the default timeout to None, and see if it propagates\r
-        socket.setdefaulttimeout(None)\r
-        self.assertEqual(socket.getdefaulttimeout(), None)\r
-        s = socket.socket()\r
-        self.assertEqual(s.gettimeout(), None)\r
-        s.close()\r
-\r
-        # Check that setting it to an invalid value raises ValueError\r
-        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)\r
-\r
-        # Check that setting it to an invalid type raises TypeError\r
-        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")\r
-\r
-    def testIPv4_inet_aton_fourbytes(self):\r
-        if not hasattr(socket, 'inet_aton'):\r
-            return  # No inet_aton, nothing to check\r
-        # Test that issue1008086 and issue767150 are fixed.\r
-        # It must return 4 bytes.\r
-        self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))\r
-        self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))\r
-\r
-    def testIPv4toString(self):\r
-        if not hasattr(socket, 'inet_pton'):\r
-            return # No inet_pton() on this platform\r
-        from socket import inet_aton as f, inet_pton, AF_INET\r
-        g = lambda a: inet_pton(AF_INET, a)\r
-\r
-        self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))\r
-        self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))\r
-        self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))\r
-        self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))\r
-        self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))\r
-\r
-        self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))\r
-        self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))\r
-        self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))\r
-        self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))\r
-\r
-    def testIPv6toString(self):\r
-        if not hasattr(socket, 'inet_pton'):\r
-            return # No inet_pton() on this platform\r
-        try:\r
-            from socket import inet_pton, AF_INET6, has_ipv6\r
-            if not has_ipv6:\r
-                return\r
-        except ImportError:\r
-            return\r
-        f = lambda a: inet_pton(AF_INET6, a)\r
-\r
-        self.assertEqual('\x00' * 16, f('::'))\r
-        self.assertEqual('\x00' * 16, f('0::0'))\r
-        self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))\r
-        self.assertEqual(\r
-            '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',\r
-            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')\r
-        )\r
-\r
-    def testStringToIPv4(self):\r
-        if not hasattr(socket, 'inet_ntop'):\r
-            return # No inet_ntop() on this platform\r
-        from socket import inet_ntoa as f, inet_ntop, AF_INET\r
-        g = lambda a: inet_ntop(AF_INET, a)\r
-\r
-        self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))\r
-        self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))\r
-        self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))\r
-        self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))\r
-\r
-        self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))\r
-        self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))\r
-        self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))\r
-\r
-    def testStringToIPv6(self):\r
-        if not hasattr(socket, 'inet_ntop'):\r
-            return # No inet_ntop() on this platform\r
-        try:\r
-            from socket import inet_ntop, AF_INET6, has_ipv6\r
-            if not has_ipv6:\r
-                return\r
-        except ImportError:\r
-            return\r
-        f = lambda a: inet_ntop(AF_INET6, a)\r
-\r
-        self.assertEqual('::', f('\x00' * 16))\r
-        self.assertEqual('::1', f('\x00' * 15 + '\x01'))\r
-        self.assertEqual(\r
-            'aef:b01:506:1001:ffff:9997:55:170',\r
-            f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')\r
-        )\r
-\r
-    # XXX The following don't test module-level functionality...\r
-\r
-    def _get_unused_port(self, bind_address='0.0.0.0'):\r
-        """Use a temporary socket to elicit an unused ephemeral port.\r
-\r
-        Args:\r
-            bind_address: Hostname or IP address to search for a port on.\r
-\r
-        Returns: A most likely to be unused port.\r
-        """\r
-        tempsock = socket.socket()\r
-        tempsock.bind((bind_address, 0))\r
-        host, port = tempsock.getsockname()\r
-        tempsock.close()\r
-        return port\r
-\r
-    def testSockName(self):\r
-        # Testing getsockname()\r
-        port = self._get_unused_port()\r
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.addCleanup(sock.close)\r
-        sock.bind(("0.0.0.0", port))\r
-        name = sock.getsockname()\r
-        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate\r
-        # it reasonable to get the host's addr in addition to 0.0.0.0.\r
-        # At least for eCos.  This is required for the S/390 to pass.\r
-        try:\r
-            my_ip_addr = socket.gethostbyname(socket.gethostname())\r
-        except socket.error:\r
-            # Probably name lookup wasn't set up right; skip this test\r
-            return\r
-        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])\r
-        self.assertEqual(name[1], port)\r
-\r
-    def testGetSockOpt(self):\r
-        # Testing getsockopt()\r
-        # We know a socket should start without reuse==0\r
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.addCleanup(sock.close)\r
-        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)\r
-        self.assertFalse(reuse != 0, "initial mode is reuse")\r
-\r
-    def testSetSockOpt(self):\r
-        # Testing setsockopt()\r
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.addCleanup(sock.close)\r
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
-        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)\r
-        self.assertFalse(reuse == 0, "failed to set reuse mode")\r
-\r
-    def testSendAfterClose(self):\r
-        # testing send() after close() with timeout\r
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        sock.settimeout(1)\r
-        sock.close()\r
-        self.assertRaises(socket.error, sock.send, "spam")\r
-\r
-    def testNewAttributes(self):\r
-        # testing .family, .type and .protocol\r
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.assertEqual(sock.family, socket.AF_INET)\r
-        self.assertEqual(sock.type, socket.SOCK_STREAM)\r
-        self.assertEqual(sock.proto, 0)\r
-        sock.close()\r
-\r
-    def test_getsockaddrarg(self):\r
-        host = '0.0.0.0'\r
-        port = self._get_unused_port(bind_address=host)\r
-        big_port = port + 65536\r
-        neg_port = port - 65536\r
-        sock = socket.socket()\r
-        try:\r
-            self.assertRaises(OverflowError, sock.bind, (host, big_port))\r
-            self.assertRaises(OverflowError, sock.bind, (host, neg_port))\r
-            sock.bind((host, port))\r
-        finally:\r
-            sock.close()\r
-\r
-    @unittest.skipUnless(os.name == "nt", "Windows specific")\r
-    def test_sock_ioctl(self):\r
-        self.assertTrue(hasattr(socket.socket, 'ioctl'))\r
-        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))\r
-        self.assertTrue(hasattr(socket, 'RCVALL_ON'))\r
-        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))\r
-        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))\r
-        s = socket.socket()\r
-        self.addCleanup(s.close)\r
-        self.assertRaises(ValueError, s.ioctl, -1, None)\r
-        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))\r
-\r
-    def testGetaddrinfo(self):\r
-        try:\r
-            socket.getaddrinfo('localhost', 80)\r
-        except socket.gaierror as err:\r
-            if err.errno == socket.EAI_SERVICE:\r
-                # see http://bugs.python.org/issue1282647\r
-                self.skipTest("buggy libc version")\r
-            raise\r
-        # len of every sequence is supposed to be == 5\r
-        for info in socket.getaddrinfo(HOST, None):\r
-            self.assertEqual(len(info), 5)\r
-        # host can be a domain name, a string representation of an\r
-        # IPv4/v6 address or None\r
-        socket.getaddrinfo('localhost', 80)\r
-        socket.getaddrinfo('127.0.0.1', 80)\r
-        socket.getaddrinfo(None, 80)\r
-        if SUPPORTS_IPV6:\r
-            socket.getaddrinfo('::1', 80)\r
-        # port can be a string service name such as "http", a numeric\r
-        # port number or None\r
-        socket.getaddrinfo(HOST, "http")\r
-        socket.getaddrinfo(HOST, 80)\r
-        socket.getaddrinfo(HOST, None)\r
-        # test family and socktype filters\r
-        infos = socket.getaddrinfo(HOST, None, socket.AF_INET)\r
-        for family, _, _, _, _ in infos:\r
-            self.assertEqual(family, socket.AF_INET)\r
-        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)\r
-        for _, socktype, _, _, _ in infos:\r
-            self.assertEqual(socktype, socket.SOCK_STREAM)\r
-        # test proto and flags arguments\r
-        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)\r
-        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)\r
-        # a server willing to support both IPv4 and IPv6 will\r
-        # usually do this\r
-        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,\r
-                           socket.AI_PASSIVE)\r
-\r
-\r
-    def check_sendall_interrupted(self, with_timeout):\r
-        # socketpair() is not stricly required, but it makes things easier.\r
-        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):\r
-            self.skipTest("signal.alarm and socket.socketpair required for this test")\r
-        # Our signal handlers clobber the C errno by calling a math function\r
-        # with an invalid domain value.\r
-        def ok_handler(*args):\r
-            self.assertRaises(ValueError, math.acosh, 0)\r
-        def raising_handler(*args):\r
-            self.assertRaises(ValueError, math.acosh, 0)\r
-            1 // 0\r
-        c, s = socket.socketpair()\r
-        old_alarm = signal.signal(signal.SIGALRM, raising_handler)\r
-        try:\r
-            if with_timeout:\r
-                # Just above the one second minimum for signal.alarm\r
-                c.settimeout(1.5)\r
-            with self.assertRaises(ZeroDivisionError):\r
-                signal.alarm(1)\r
-                c.sendall(b"x" * (1024**2))\r
-            if with_timeout:\r
-                signal.signal(signal.SIGALRM, ok_handler)\r
-                signal.alarm(1)\r
-                self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))\r
-        finally:\r
-            signal.signal(signal.SIGALRM, old_alarm)\r
-            c.close()\r
-            s.close()\r
-\r
-    def test_sendall_interrupted(self):\r
-        self.check_sendall_interrupted(False)\r
-\r
-    def test_sendall_interrupted_with_timeout(self):\r
-        self.check_sendall_interrupted(True)\r
-\r
-    def testListenBacklog0(self):\r
-        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        srv.bind((HOST, 0))\r
-        # backlog = 0\r
-        srv.listen(0)\r
-        srv.close()\r
-\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class BasicTCPTest(SocketConnectedTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketConnectedTest.__init__(self, methodName=methodName)\r
-\r
-    def testRecv(self):\r
-        # Testing large receive over TCP\r
-        msg = self.cli_conn.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecv(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testOverFlowRecv(self):\r
-        # Testing receive in chunks over TCP\r
-        seg1 = self.cli_conn.recv(len(MSG) - 3)\r
-        seg2 = self.cli_conn.recv(1024)\r
-        msg = seg1 + seg2\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testOverFlowRecv(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testRecvFrom(self):\r
-        # Testing large recvfrom() over TCP\r
-        msg, addr = self.cli_conn.recvfrom(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecvFrom(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testOverFlowRecvFrom(self):\r
-        # Testing recvfrom() in chunks over TCP\r
-        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)\r
-        seg2, addr = self.cli_conn.recvfrom(1024)\r
-        msg = seg1 + seg2\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testOverFlowRecvFrom(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testSendAll(self):\r
-        # Testing sendall() with a 2048 byte string over TCP\r
-        msg = ''\r
-        while 1:\r
-            read = self.cli_conn.recv(1024)\r
-            if not read:\r
-                break\r
-            msg += read\r
-        self.assertEqual(msg, 'f' * 2048)\r
-\r
-    def _testSendAll(self):\r
-        big_chunk = 'f' * 2048\r
-        self.serv_conn.sendall(big_chunk)\r
-\r
-    def testFromFd(self):\r
-        # Testing fromfd()\r
-        if not hasattr(socket, "fromfd"):\r
-            return # On Windows, this doesn't exist\r
-        fd = self.cli_conn.fileno()\r
-        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)\r
-        self.addCleanup(sock.close)\r
-        msg = sock.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testFromFd(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testDup(self):\r
-        # Testing dup()\r
-        sock = self.cli_conn.dup()\r
-        self.addCleanup(sock.close)\r
-        msg = sock.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testDup(self):\r
-        self.serv_conn.send(MSG)\r
-\r
-    def testShutdown(self):\r
-        # Testing shutdown()\r
-        msg = self.cli_conn.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-        # wait for _testShutdown to finish: on OS X, when the server\r
-        # closes the connection the client also becomes disconnected,\r
-        # and the client's shutdown call will fail. (Issue #4397.)\r
-        self.done.wait()\r
-\r
-    def _testShutdown(self):\r
-        self.serv_conn.send(MSG)\r
-        self.serv_conn.shutdown(2)\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class BasicUDPTest(ThreadedUDPSocketTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        ThreadedUDPSocketTest.__init__(self, methodName=methodName)\r
-\r
-    def testSendtoAndRecv(self):\r
-        # Testing sendto() and Recv() over UDP\r
-        msg = self.serv.recv(len(MSG))\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testSendtoAndRecv(self):\r
-        self.cli.sendto(MSG, 0, (HOST, self.port))\r
-\r
-    def testRecvFrom(self):\r
-        # Testing recvfrom() over UDP\r
-        msg, addr = self.serv.recvfrom(len(MSG))\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecvFrom(self):\r
-        self.cli.sendto(MSG, 0, (HOST, self.port))\r
-\r
-    def testRecvFromNegative(self):\r
-        # Negative lengths passed to recvfrom should give ValueError.\r
-        self.assertRaises(ValueError, self.serv.recvfrom, -1)\r
-\r
-    def _testRecvFromNegative(self):\r
-        self.cli.sendto(MSG, 0, (HOST, self.port))\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class TCPCloserTest(ThreadedTCPSocketTest):\r
-\r
-    def testClose(self):\r
-        conn, addr = self.serv.accept()\r
-        conn.close()\r
-\r
-        sd = self.cli\r
-        read, write, err = select.select([sd], [], [], 1.0)\r
-        self.assertEqual(read, [sd])\r
-        self.assertEqual(sd.recv(1), '')\r
-\r
-    def _testClose(self):\r
-        self.cli.connect((HOST, self.port))\r
-        time.sleep(1.0)\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class BasicSocketPairTest(SocketPairTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketPairTest.__init__(self, methodName=methodName)\r
-\r
-    def testRecv(self):\r
-        msg = self.serv.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecv(self):\r
-        self.cli.send(MSG)\r
-\r
-    def testSend(self):\r
-        self.serv.send(MSG)\r
-\r
-    def _testSend(self):\r
-        msg = self.cli.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class NonBlockingTCPTests(ThreadedTCPSocketTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        ThreadedTCPSocketTest.__init__(self, methodName=methodName)\r
-\r
-    def testSetBlocking(self):\r
-        # Testing whether set blocking works\r
-        self.serv.setblocking(0)\r
-        start = time.time()\r
-        try:\r
-            self.serv.accept()\r
-        except socket.error:\r
-            pass\r
-        end = time.time()\r
-        self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")\r
-\r
-    def _testSetBlocking(self):\r
-        pass\r
-\r
-    def testAccept(self):\r
-        # Testing non-blocking accept\r
-        self.serv.setblocking(0)\r
-        try:\r
-            conn, addr = self.serv.accept()\r
-        except socket.error:\r
-            pass\r
-        else:\r
-            self.fail("Error trying to do non-blocking accept.")\r
-        read, write, err = select.select([self.serv], [], [])\r
-        if self.serv in read:\r
-            conn, addr = self.serv.accept()\r
-            conn.close()\r
-        else:\r
-            self.fail("Error trying to do accept after select.")\r
-\r
-    def _testAccept(self):\r
-        time.sleep(0.1)\r
-        self.cli.connect((HOST, self.port))\r
-\r
-    def testConnect(self):\r
-        # Testing non-blocking connect\r
-        conn, addr = self.serv.accept()\r
-        conn.close()\r
-\r
-    def _testConnect(self):\r
-        self.cli.settimeout(10)\r
-        self.cli.connect((HOST, self.port))\r
-\r
-    def testRecv(self):\r
-        # Testing non-blocking recv\r
-        conn, addr = self.serv.accept()\r
-        conn.setblocking(0)\r
-        try:\r
-            msg = conn.recv(len(MSG))\r
-        except socket.error:\r
-            pass\r
-        else:\r
-            self.fail("Error trying to do non-blocking recv.")\r
-        read, write, err = select.select([conn], [], [])\r
-        if conn in read:\r
-            msg = conn.recv(len(MSG))\r
-            conn.close()\r
-            self.assertEqual(msg, MSG)\r
-        else:\r
-            self.fail("Error during select call to non-blocking socket.")\r
-\r
-    def _testRecv(self):\r
-        self.cli.connect((HOST, self.port))\r
-        time.sleep(0.1)\r
-        self.cli.send(MSG)\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class FileObjectClassTestCase(SocketConnectedTest):\r
-\r
-    bufsize = -1 # Use default buffer size\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketConnectedTest.__init__(self, methodName=methodName)\r
-\r
-    def setUp(self):\r
-        SocketConnectedTest.setUp(self)\r
-        self.serv_file = self.cli_conn.makefile('rb', self.bufsize)\r
-\r
-    def tearDown(self):\r
-        self.serv_file.close()\r
-        self.assertTrue(self.serv_file.closed)\r
-        self.serv_file = None\r
-        SocketConnectedTest.tearDown(self)\r
-\r
-    def clientSetUp(self):\r
-        SocketConnectedTest.clientSetUp(self)\r
-        self.cli_file = self.serv_conn.makefile('wb')\r
-\r
-    def clientTearDown(self):\r
-        self.cli_file.close()\r
-        self.assertTrue(self.cli_file.closed)\r
-        self.cli_file = None\r
-        SocketConnectedTest.clientTearDown(self)\r
-\r
-    def testSmallRead(self):\r
-        # Performing small file read test\r
-        first_seg = self.serv_file.read(len(MSG)-3)\r
-        second_seg = self.serv_file.read(3)\r
-        msg = first_seg + second_seg\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testSmallRead(self):\r
-        self.cli_file.write(MSG)\r
-        self.cli_file.flush()\r
-\r
-    def testFullRead(self):\r
-        # read until EOF\r
-        msg = self.serv_file.read()\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testFullRead(self):\r
-        self.cli_file.write(MSG)\r
-        self.cli_file.close()\r
-\r
-    def testUnbufferedRead(self):\r
-        # Performing unbuffered file read test\r
-        buf = ''\r
-        while 1:\r
-            char = self.serv_file.read(1)\r
-            if not char:\r
-                break\r
-            buf += char\r
-        self.assertEqual(buf, MSG)\r
-\r
-    def _testUnbufferedRead(self):\r
-        self.cli_file.write(MSG)\r
-        self.cli_file.flush()\r
-\r
-    def testReadline(self):\r
-        # Performing file readline test\r
-        line = self.serv_file.readline()\r
-        self.assertEqual(line, MSG)\r
-\r
-    def _testReadline(self):\r
-        self.cli_file.write(MSG)\r
-        self.cli_file.flush()\r
-\r
-    def testReadlineAfterRead(self):\r
-        a_baloo_is = self.serv_file.read(len("A baloo is"))\r
-        self.assertEqual("A baloo is", a_baloo_is)\r
-        _a_bear = self.serv_file.read(len(" a bear"))\r
-        self.assertEqual(" a bear", _a_bear)\r
-        line = self.serv_file.readline()\r
-        self.assertEqual("\n", line)\r
-        line = self.serv_file.readline()\r
-        self.assertEqual("A BALOO IS A BEAR.\n", line)\r
-        line = self.serv_file.readline()\r
-        self.assertEqual(MSG, line)\r
-\r
-    def _testReadlineAfterRead(self):\r
-        self.cli_file.write("A baloo is a bear\n")\r
-        self.cli_file.write("A BALOO IS A BEAR.\n")\r
-        self.cli_file.write(MSG)\r
-        self.cli_file.flush()\r
-\r
-    def testReadlineAfterReadNoNewline(self):\r
-        end_of_ = self.serv_file.read(len("End Of "))\r
-        self.assertEqual("End Of ", end_of_)\r
-        line = self.serv_file.readline()\r
-        self.assertEqual("Line", line)\r
-\r
-    def _testReadlineAfterReadNoNewline(self):\r
-        self.cli_file.write("End Of Line")\r
-\r
-    def testClosedAttr(self):\r
-        self.assertTrue(not self.serv_file.closed)\r
-\r
-    def _testClosedAttr(self):\r
-        self.assertTrue(not self.cli_file.closed)\r
-\r
-\r
-class FileObjectInterruptedTestCase(unittest.TestCase):\r
-    """Test that the file object correctly handles EINTR internally."""\r
-\r
-    class MockSocket(object):\r
-        def __init__(self, recv_funcs=()):\r
-            # A generator that returns callables that we'll call for each\r
-            # call to recv().\r
-            self._recv_step = iter(recv_funcs)\r
-\r
-        def recv(self, size):\r
-            return self._recv_step.next()()\r
-\r
-    @staticmethod\r
-    def _raise_eintr():\r
-        raise socket.error(errno.EINTR)\r
-\r
-    def _test_readline(self, size=-1, **kwargs):\r
-        mock_sock = self.MockSocket(recv_funcs=[\r
-                lambda : "This is the first line\nAnd the sec",\r
-                self._raise_eintr,\r
-                lambda : "ond line is here\n",\r
-                lambda : "",\r
-            ])\r
-        fo = socket._fileobject(mock_sock, **kwargs)\r
-        self.assertEqual(fo.readline(size), "This is the first line\n")\r
-        self.assertEqual(fo.readline(size), "And the second line is here\n")\r
-\r
-    def _test_read(self, size=-1, **kwargs):\r
-        mock_sock = self.MockSocket(recv_funcs=[\r
-                lambda : "This is the first line\nAnd the sec",\r
-                self._raise_eintr,\r
-                lambda : "ond line is here\n",\r
-                lambda : "",\r
-            ])\r
-        fo = socket._fileobject(mock_sock, **kwargs)\r
-        self.assertEqual(fo.read(size), "This is the first line\n"\r
-                          "And the second line is here\n")\r
-\r
-    def test_default(self):\r
-        self._test_readline()\r
-        self._test_readline(size=100)\r
-        self._test_read()\r
-        self._test_read(size=100)\r
-\r
-    def test_with_1k_buffer(self):\r
-        self._test_readline(bufsize=1024)\r
-        self._test_readline(size=100, bufsize=1024)\r
-        self._test_read(bufsize=1024)\r
-        self._test_read(size=100, bufsize=1024)\r
-\r
-    def _test_readline_no_buffer(self, size=-1):\r
-        mock_sock = self.MockSocket(recv_funcs=[\r
-                lambda : "aa",\r
-                lambda : "\n",\r
-                lambda : "BB",\r
-                self._raise_eintr,\r
-                lambda : "bb",\r
-                lambda : "",\r
-            ])\r
-        fo = socket._fileobject(mock_sock, bufsize=0)\r
-        self.assertEqual(fo.readline(size), "aa\n")\r
-        self.assertEqual(fo.readline(size), "BBbb")\r
-\r
-    def test_no_buffer(self):\r
-        self._test_readline_no_buffer()\r
-        self._test_readline_no_buffer(size=4)\r
-        self._test_read(bufsize=0)\r
-        self._test_read(size=100, bufsize=0)\r
-\r
-\r
-class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
-\r
-    """Repeat the tests from FileObjectClassTestCase with bufsize==0.\r
-\r
-    In this case (and in this case only), it should be possible to\r
-    create a file object, read a line from it, create another file\r
-    object, read another line from it, without loss of data in the\r
-    first file object's buffer.  Note that httplib relies on this\r
-    when reading multiple requests from the same socket."""\r
-\r
-    bufsize = 0 # Use unbuffered mode\r
-\r
-    def testUnbufferedReadline(self):\r
-        # Read a line, create a new file object, read another line with it\r
-        line = self.serv_file.readline() # first line\r
-        self.assertEqual(line, "A. " + MSG) # first line\r
-        self.serv_file = self.cli_conn.makefile('rb', 0)\r
-        line = self.serv_file.readline() # second line\r
-        self.assertEqual(line, "B. " + MSG) # second line\r
-\r
-    def _testUnbufferedReadline(self):\r
-        self.cli_file.write("A. " + MSG)\r
-        self.cli_file.write("B. " + MSG)\r
-        self.cli_file.flush()\r
-\r
-class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
-\r
-    bufsize = 1 # Default-buffered for reading; line-buffered for writing\r
-\r
-\r
-class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):\r
-\r
-    bufsize = 2 # Exercise the buffering code\r
-\r
-\r
-class NetworkConnectionTest(object):\r
-    """Prove network connection."""\r
-    def clientSetUp(self):\r
-        # We're inherited below by BasicTCPTest2, which also inherits\r
-        # BasicTCPTest, which defines self.port referenced below.\r
-        self.cli = socket.create_connection((HOST, self.port))\r
-        self.serv_conn = self.cli\r
-\r
-class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):\r
-    """Tests that NetworkConnection does not break existing TCP functionality.\r
-    """\r
-\r
-class NetworkConnectionNoServer(unittest.TestCase):\r
-    class MockSocket(socket.socket):\r
-        def connect(self, *args):\r
-            raise socket.timeout('timed out')\r
-\r
-    @contextlib.contextmanager\r
-    def mocked_socket_module(self):\r
-        """Return a socket which times out on connect"""\r
-        old_socket = socket.socket\r
-        socket.socket = self.MockSocket\r
-        try:\r
-            yield\r
-        finally:\r
-            socket.socket = old_socket\r
-\r
-    def test_connect(self):\r
-        port = test_support.find_unused_port()\r
-        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
-        self.addCleanup(cli.close)\r
-        with self.assertRaises(socket.error) as cm:\r
-            cli.connect((HOST, port))\r
-        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)\r
-\r
-    def test_create_connection(self):\r
-        # Issue #9792: errors raised by create_connection() should have\r
-        # a proper errno attribute.\r
-        port = test_support.find_unused_port()\r
-        with self.assertRaises(socket.error) as cm:\r
-            socket.create_connection((HOST, port))\r
-        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)\r
-\r
-    def test_create_connection_timeout(self):\r
-        # Issue #9792: create_connection() should not recast timeout errors\r
-        # as generic socket errors.\r
-        with self.mocked_socket_module():\r
-            with self.assertRaises(socket.timeout):\r
-                socket.create_connection((HOST, 1234))\r
-\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketTCPTest.__init__(self, methodName=methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def clientSetUp(self):\r
-        self.source_port = test_support.find_unused_port()\r
-\r
-    def clientTearDown(self):\r
-        self.cli.close()\r
-        self.cli = None\r
-        ThreadableTest.clientTearDown(self)\r
-\r
-    def _justAccept(self):\r
-        conn, addr = self.serv.accept()\r
-        conn.close()\r
-\r
-    testFamily = _justAccept\r
-    def _testFamily(self):\r
-        self.cli = socket.create_connection((HOST, self.port), timeout=30)\r
-        self.addCleanup(self.cli.close)\r
-        self.assertEqual(self.cli.family, 2)\r
-\r
-    testSourceAddress = _justAccept\r
-    def _testSourceAddress(self):\r
-        self.cli = socket.create_connection((HOST, self.port), timeout=30,\r
-                source_address=('', self.source_port))\r
-        self.addCleanup(self.cli.close)\r
-        self.assertEqual(self.cli.getsockname()[1], self.source_port)\r
-        # The port number being used is sufficient to show that the bind()\r
-        # call happened.\r
-\r
-    testTimeoutDefault = _justAccept\r
-    def _testTimeoutDefault(self):\r
-        # passing no explicit timeout uses socket's global default\r
-        self.assertTrue(socket.getdefaulttimeout() is None)\r
-        socket.setdefaulttimeout(42)\r
-        try:\r
-            self.cli = socket.create_connection((HOST, self.port))\r
-            self.addCleanup(self.cli.close)\r
-        finally:\r
-            socket.setdefaulttimeout(None)\r
-        self.assertEqual(self.cli.gettimeout(), 42)\r
-\r
-    testTimeoutNone = _justAccept\r
-    def _testTimeoutNone(self):\r
-        # None timeout means the same as sock.settimeout(None)\r
-        self.assertTrue(socket.getdefaulttimeout() is None)\r
-        socket.setdefaulttimeout(30)\r
-        try:\r
-            self.cli = socket.create_connection((HOST, self.port), timeout=None)\r
-            self.addCleanup(self.cli.close)\r
-        finally:\r
-            socket.setdefaulttimeout(None)\r
-        self.assertEqual(self.cli.gettimeout(), None)\r
-\r
-    testTimeoutValueNamed = _justAccept\r
-    def _testTimeoutValueNamed(self):\r
-        self.cli = socket.create_connection((HOST, self.port), timeout=30)\r
-        self.assertEqual(self.cli.gettimeout(), 30)\r
-\r
-    testTimeoutValueNonamed = _justAccept\r
-    def _testTimeoutValueNonamed(self):\r
-        self.cli = socket.create_connection((HOST, self.port), 30)\r
-        self.addCleanup(self.cli.close)\r
-        self.assertEqual(self.cli.gettimeout(), 30)\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):\r
-\r
-    def __init__(self, methodName='runTest'):\r
-        SocketTCPTest.__init__(self, methodName=methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def clientSetUp(self):\r
-        pass\r
-\r
-    def clientTearDown(self):\r
-        self.cli.close()\r
-        self.cli = None\r
-        ThreadableTest.clientTearDown(self)\r
-\r
-    def testInsideTimeout(self):\r
-        conn, addr = self.serv.accept()\r
-        self.addCleanup(conn.close)\r
-        time.sleep(3)\r
-        conn.send("done!")\r
-    testOutsideTimeout = testInsideTimeout\r
-\r
-    def _testInsideTimeout(self):\r
-        self.cli = sock = socket.create_connection((HOST, self.port))\r
-        data = sock.recv(5)\r
-        self.assertEqual(data, "done!")\r
-\r
-    def _testOutsideTimeout(self):\r
-        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)\r
-        self.assertRaises(socket.timeout, lambda: sock.recv(5))\r
-\r
-\r
-class Urllib2FileobjectTest(unittest.TestCase):\r
-\r
-    # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that\r
-    # it close the socket if the close c'tor argument is true\r
-\r
-    def testClose(self):\r
-        class MockSocket:\r
-            closed = False\r
-            def flush(self): pass\r
-            def close(self): self.closed = True\r
-\r
-        # must not close unless we request it: the original use of _fileobject\r
-        # by module socket requires that the underlying socket not be closed until\r
-        # the _socketobject that created the _fileobject is closed\r
-        s = MockSocket()\r
-        f = socket._fileobject(s)\r
-        f.close()\r
-        self.assertTrue(not s.closed)\r
-\r
-        s = MockSocket()\r
-        f = socket._fileobject(s, close=True)\r
-        f.close()\r
-        self.assertTrue(s.closed)\r
-\r
-class TCPTimeoutTest(SocketTCPTest):\r
-\r
-    def testTCPTimeout(self):\r
-        def raise_timeout(*args, **kwargs):\r
-            self.serv.settimeout(1.0)\r
-            self.serv.accept()\r
-        self.assertRaises(socket.timeout, raise_timeout,\r
-                              "Error generating a timeout exception (TCP)")\r
-\r
-    def testTimeoutZero(self):\r
-        ok = False\r
-        try:\r
-            self.serv.settimeout(0.0)\r
-            foo = self.serv.accept()\r
-        except socket.timeout:\r
-            self.fail("caught timeout instead of error (TCP)")\r
-        except socket.error:\r
-            ok = True\r
-        except:\r
-            self.fail("caught unexpected exception (TCP)")\r
-        if not ok:\r
-            self.fail("accept() returned success when we did not expect it")\r
-\r
-    def testInterruptedTimeout(self):\r
-        # XXX I don't know how to do this test on MSWindows or any other\r
-        # plaform that doesn't support signal.alarm() or os.kill(), though\r
-        # the bug should have existed on all platforms.\r
-        if not hasattr(signal, "alarm"):\r
-            return                  # can only test on *nix\r
-        self.serv.settimeout(5.0)   # must be longer than alarm\r
-        class Alarm(Exception):\r
-            pass\r
-        def alarm_handler(signal, frame):\r
-            raise Alarm\r
-        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)\r
-        try:\r
-            signal.alarm(2)    # POSIX allows alarm to be up to 1 second early\r
-            try:\r
-                foo = self.serv.accept()\r
-            except socket.timeout:\r
-                self.fail("caught timeout instead of Alarm")\r
-            except Alarm:\r
-                pass\r
-            except:\r
-                self.fail("caught other exception instead of Alarm:"\r
-                          " %s(%s):\n%s" %\r
-                          (sys.exc_info()[:2] + (traceback.format_exc(),)))\r
-            else:\r
-                self.fail("nothing caught")\r
-            finally:\r
-                signal.alarm(0)         # shut off alarm\r
-        except Alarm:\r
-            self.fail("got Alarm in wrong place")\r
-        finally:\r
-            # no alarm can be pending.  Safe to restore old handler.\r
-            signal.signal(signal.SIGALRM, old_alarm)\r
-\r
-class UDPTimeoutTest(SocketTCPTest):\r
-\r
-    def testUDPTimeout(self):\r
-        def raise_timeout(*args, **kwargs):\r
-            self.serv.settimeout(1.0)\r
-            self.serv.recv(1024)\r
-        self.assertRaises(socket.timeout, raise_timeout,\r
-                              "Error generating a timeout exception (UDP)")\r
-\r
-    def testTimeoutZero(self):\r
-        ok = False\r
-        try:\r
-            self.serv.settimeout(0.0)\r
-            foo = self.serv.recv(1024)\r
-        except socket.timeout:\r
-            self.fail("caught timeout instead of error (UDP)")\r
-        except socket.error:\r
-            ok = True\r
-        except:\r
-            self.fail("caught unexpected exception (UDP)")\r
-        if not ok:\r
-            self.fail("recv() returned success when we did not expect it")\r
-\r
-class TestExceptions(unittest.TestCase):\r
-\r
-    def testExceptionTree(self):\r
-        self.assertTrue(issubclass(socket.error, Exception))\r
-        self.assertTrue(issubclass(socket.herror, socket.error))\r
-        self.assertTrue(issubclass(socket.gaierror, socket.error))\r
-        self.assertTrue(issubclass(socket.timeout, socket.error))\r
-\r
-class TestLinuxAbstractNamespace(unittest.TestCase):\r
-\r
-    UNIX_PATH_MAX = 108\r
-\r
-    def testLinuxAbstractNamespace(self):\r
-        address = "\x00python-test-hello\x00\xff"\r
-        s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
-        s1.bind(address)\r
-        s1.listen(1)\r
-        s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
-        s2.connect(s1.getsockname())\r
-        s1.accept()\r
-        self.assertEqual(s1.getsockname(), address)\r
-        self.assertEqual(s2.getpeername(), address)\r
-\r
-    def testMaxName(self):\r
-        address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)\r
-        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
-        s.bind(address)\r
-        self.assertEqual(s.getsockname(), address)\r
-\r
-    def testNameOverflow(self):\r
-        address = "\x00" + "h" * self.UNIX_PATH_MAX\r
-        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)\r
-        self.assertRaises(socket.error, s.bind, address)\r
-\r
-\r
-@unittest.skipUnless(thread, 'Threading required for this test.')\r
-class BufferIOTest(SocketConnectedTest):\r
-    """\r
-    Test the buffer versions of socket.recv() and socket.send().\r
-    """\r
-    def __init__(self, methodName='runTest'):\r
-        SocketConnectedTest.__init__(self, methodName=methodName)\r
-\r
-    def testRecvIntoArray(self):\r
-        buf = array.array('c', ' '*1024)\r
-        nbytes = self.cli_conn.recv_into(buf)\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf.tostring()[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecvIntoArray(self):\r
-        with test_support.check_py3k_warnings():\r
-            buf = buffer(MSG)\r
-        self.serv_conn.send(buf)\r
-\r
-    def testRecvIntoBytearray(self):\r
-        buf = bytearray(1024)\r
-        nbytes = self.cli_conn.recv_into(buf)\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    _testRecvIntoBytearray = _testRecvIntoArray\r
-\r
-    def testRecvIntoMemoryview(self):\r
-        buf = bytearray(1024)\r
-        nbytes = self.cli_conn.recv_into(memoryview(buf))\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    _testRecvIntoMemoryview = _testRecvIntoArray\r
-\r
-    def testRecvFromIntoArray(self):\r
-        buf = array.array('c', ' '*1024)\r
-        nbytes, addr = self.cli_conn.recvfrom_into(buf)\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf.tostring()[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    def _testRecvFromIntoArray(self):\r
-        with test_support.check_py3k_warnings():\r
-            buf = buffer(MSG)\r
-        self.serv_conn.send(buf)\r
-\r
-    def testRecvFromIntoBytearray(self):\r
-        buf = bytearray(1024)\r
-        nbytes, addr = self.cli_conn.recvfrom_into(buf)\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    _testRecvFromIntoBytearray = _testRecvFromIntoArray\r
-\r
-    def testRecvFromIntoMemoryview(self):\r
-        buf = bytearray(1024)\r
-        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))\r
-        self.assertEqual(nbytes, len(MSG))\r
-        msg = buf[:len(MSG)]\r
-        self.assertEqual(msg, MSG)\r
-\r
-    _testRecvFromIntoMemoryview = _testRecvFromIntoArray\r
-\r
-\r
-TIPC_STYPE = 2000\r
-TIPC_LOWER = 200\r
-TIPC_UPPER = 210\r
-\r
-def isTipcAvailable():\r
-    """Check if the TIPC module is loaded\r
-\r
-    The TIPC module is not loaded automatically on Ubuntu and probably\r
-    other Linux distros.\r
-    """\r
-    if not hasattr(socket, "AF_TIPC"):\r
-        return False\r
-    if not os.path.isfile("/proc/modules"):\r
-        return False\r
-    with open("/proc/modules") as f:\r
-        for line in f:\r
-            if line.startswith("tipc "):\r
-                return True\r
-    if test_support.verbose:\r
-        print "TIPC module is not loaded, please 'sudo modprobe tipc'"\r
-    return False\r
-\r
-class TIPCTest (unittest.TestCase):\r
-    def testRDM(self):\r
-        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)\r
-        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)\r
-\r
-        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
-        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,\r
-                TIPC_LOWER, TIPC_UPPER)\r
-        srv.bind(srvaddr)\r
-\r
-        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,\r
-                TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)\r
-        cli.sendto(MSG, sendaddr)\r
-\r
-        msg, recvaddr = srv.recvfrom(1024)\r
-\r
-        self.assertEqual(cli.getsockname(), recvaddr)\r
-        self.assertEqual(msg, MSG)\r
-\r
-\r
-class TIPCThreadableTest (unittest.TestCase, ThreadableTest):\r
-    def __init__(self, methodName = 'runTest'):\r
-        unittest.TestCase.__init__(self, methodName = methodName)\r
-        ThreadableTest.__init__(self)\r
-\r
-    def setUp(self):\r
-        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)\r
-        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
-        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,\r
-                TIPC_LOWER, TIPC_UPPER)\r
-        self.srv.bind(srvaddr)\r
-        self.srv.listen(5)\r
-        self.serverExplicitReady()\r
-        self.conn, self.connaddr = self.srv.accept()\r
-\r
-    def clientSetUp(self):\r
-        # The is a hittable race between serverExplicitReady() and the\r
-        # accept() call; sleep a little while to avoid it, otherwise\r
-        # we could get an exception\r
-        time.sleep(0.1)\r
-        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)\r
-        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,\r
-                TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)\r
-        self.cli.connect(addr)\r
-        self.cliaddr = self.cli.getsockname()\r
-\r
-    def testStream(self):\r
-        msg = self.conn.recv(1024)\r
-        self.assertEqual(msg, MSG)\r
-        self.assertEqual(self.cliaddr, self.connaddr)\r
-\r
-    def _testStream(self):\r
-        self.cli.send(MSG)\r
-        self.cli.close()\r
-\r
-\r
-def test_main():\r
-    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,\r
-             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,\r
-             UDPTimeoutTest ]\r
-\r
-    tests.extend([\r
-        NonBlockingTCPTests,\r
-        FileObjectClassTestCase,\r
-        FileObjectInterruptedTestCase,\r
-        UnbufferedFileObjectClassTestCase,\r
-        LineBufferedFileObjectClassTestCase,\r
-        SmallBufferedFileObjectClassTestCase,\r
-        Urllib2FileobjectTest,\r
-        NetworkConnectionNoServer,\r
-        NetworkConnectionAttributesTest,\r
-        NetworkConnectionBehaviourTest,\r
-    ])\r
-    if hasattr(socket, "socketpair"):\r
-        tests.append(BasicSocketPairTest)\r
-    if sys.platform == 'linux2':\r
-        tests.append(TestLinuxAbstractNamespace)\r
-    if isTipcAvailable():\r
-        tests.append(TIPCTest)\r
-        tests.append(TIPCThreadableTest)\r
-\r
-    thread_info = test_support.threading_setup()\r
-    test_support.run_unittest(*tests)\r
-    test_support.threading_cleanup(*thread_info)\r
-\r
-if __name__ == "__main__":\r
-    test_main()\r