]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_urllib2_localnet.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_urllib2_localnet.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_urllib2_localnet.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_urllib2_localnet.py
deleted file mode 100644 (file)
index aef5111..0000000
+++ /dev/null
@@ -1,539 +0,0 @@
-#!/usr/bin/env python\r
-\r
-import urlparse\r
-import urllib2\r
-import BaseHTTPServer\r
-import unittest\r
-import hashlib\r
-from test import test_support\r
-mimetools = test_support.import_module('mimetools', deprecated=True)\r
-threading = test_support.import_module('threading')\r
-\r
-# Loopback http server infrastructure\r
-\r
-class LoopbackHttpServer(BaseHTTPServer.HTTPServer):\r
-    """HTTP server w/ a few modifications that make it useful for\r
-    loopback testing purposes.\r
-    """\r
-\r
-    def __init__(self, server_address, RequestHandlerClass):\r
-        BaseHTTPServer.HTTPServer.__init__(self,\r
-                                           server_address,\r
-                                           RequestHandlerClass)\r
-\r
-        # Set the timeout of our listening socket really low so\r
-        # that we can stop the server easily.\r
-        self.socket.settimeout(1.0)\r
-\r
-    def get_request(self):\r
-        """BaseHTTPServer method, overridden."""\r
-\r
-        request, client_address = self.socket.accept()\r
-\r
-        # It's a loopback connection, so setting the timeout\r
-        # really low shouldn't affect anything, but should make\r
-        # deadlocks less likely to occur.\r
-        request.settimeout(10.0)\r
-\r
-        return (request, client_address)\r
-\r
-class LoopbackHttpServerThread(threading.Thread):\r
-    """Stoppable thread that runs a loopback http server."""\r
-\r
-    def __init__(self, request_handler):\r
-        threading.Thread.__init__(self)\r
-        self._stop = False\r
-        self.ready = threading.Event()\r
-        request_handler.protocol_version = "HTTP/1.0"\r
-        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),\r
-                                        request_handler)\r
-        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,\r
-        #                                      self.httpd.server_port)\r
-        self.port = self.httpd.server_port\r
-\r
-    def stop(self):\r
-        """Stops the webserver if it's currently running."""\r
-\r
-        # Set the stop flag.\r
-        self._stop = True\r
-\r
-        self.join()\r
-\r
-    def run(self):\r
-        self.ready.set()\r
-        while not self._stop:\r
-            self.httpd.handle_request()\r
-\r
-# Authentication infrastructure\r
-\r
-class DigestAuthHandler:\r
-    """Handler for performing digest authentication."""\r
-\r
-    def __init__(self):\r
-        self._request_num = 0\r
-        self._nonces = []\r
-        self._users = {}\r
-        self._realm_name = "Test Realm"\r
-        self._qop = "auth"\r
-\r
-    def set_qop(self, qop):\r
-        self._qop = qop\r
-\r
-    def set_users(self, users):\r
-        assert isinstance(users, dict)\r
-        self._users = users\r
-\r
-    def set_realm(self, realm):\r
-        self._realm_name = realm\r
-\r
-    def _generate_nonce(self):\r
-        self._request_num += 1\r
-        nonce = hashlib.md5(str(self._request_num)).hexdigest()\r
-        self._nonces.append(nonce)\r
-        return nonce\r
-\r
-    def _create_auth_dict(self, auth_str):\r
-        first_space_index = auth_str.find(" ")\r
-        auth_str = auth_str[first_space_index+1:]\r
-\r
-        parts = auth_str.split(",")\r
-\r
-        auth_dict = {}\r
-        for part in parts:\r
-            name, value = part.split("=")\r
-            name = name.strip()\r
-            if value[0] == '"' and value[-1] == '"':\r
-                value = value[1:-1]\r
-            else:\r
-                value = value.strip()\r
-            auth_dict[name] = value\r
-        return auth_dict\r
-\r
-    def _validate_auth(self, auth_dict, password, method, uri):\r
-        final_dict = {}\r
-        final_dict.update(auth_dict)\r
-        final_dict["password"] = password\r
-        final_dict["method"] = method\r
-        final_dict["uri"] = uri\r
-        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict\r
-        HA1 = hashlib.md5(HA1_str).hexdigest()\r
-        HA2_str = "%(method)s:%(uri)s" % final_dict\r
-        HA2 = hashlib.md5(HA2_str).hexdigest()\r
-        final_dict["HA1"] = HA1\r
-        final_dict["HA2"] = HA2\r
-        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \\r
-                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict\r
-        response = hashlib.md5(response_str).hexdigest()\r
-\r
-        return response == auth_dict["response"]\r
-\r
-    def _return_auth_challenge(self, request_handler):\r
-        request_handler.send_response(407, "Proxy Authentication Required")\r
-        request_handler.send_header("Content-Type", "text/html")\r
-        request_handler.send_header(\r
-            'Proxy-Authenticate', 'Digest realm="%s", '\r
-            'qop="%s",'\r
-            'nonce="%s", ' % \\r
-            (self._realm_name, self._qop, self._generate_nonce()))\r
-        # XXX: Not sure if we're supposed to add this next header or\r
-        # not.\r
-        #request_handler.send_header('Connection', 'close')\r
-        request_handler.end_headers()\r
-        request_handler.wfile.write("Proxy Authentication Required.")\r
-        return False\r
-\r
-    def handle_request(self, request_handler):\r
-        """Performs digest authentication on the given HTTP request\r
-        handler.  Returns True if authentication was successful, False\r
-        otherwise.\r
-\r
-        If no users have been set, then digest auth is effectively\r
-        disabled and this method will always return True.\r
-        """\r
-\r
-        if len(self._users) == 0:\r
-            return True\r
-\r
-        if 'Proxy-Authorization' not in request_handler.headers:\r
-            return self._return_auth_challenge(request_handler)\r
-        else:\r
-            auth_dict = self._create_auth_dict(\r
-                request_handler.headers['Proxy-Authorization']\r
-                )\r
-            if auth_dict["username"] in self._users:\r
-                password = self._users[ auth_dict["username"] ]\r
-            else:\r
-                return self._return_auth_challenge(request_handler)\r
-            if not auth_dict.get("nonce") in self._nonces:\r
-                return self._return_auth_challenge(request_handler)\r
-            else:\r
-                self._nonces.remove(auth_dict["nonce"])\r
-\r
-            auth_validated = False\r
-\r
-            # MSIE uses short_path in its validation, but Python's\r
-            # urllib2 uses the full path, so we're going to see if\r
-            # either of them works here.\r
-\r
-            for path in [request_handler.path, request_handler.short_path]:\r
-                if self._validate_auth(auth_dict,\r
-                                       password,\r
-                                       request_handler.command,\r
-                                       path):\r
-                    auth_validated = True\r
-\r
-            if not auth_validated:\r
-                return self._return_auth_challenge(request_handler)\r
-            return True\r
-\r
-# Proxy test infrastructure\r
-\r
-class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):\r
-    """This is a 'fake proxy' that makes it look like the entire\r
-    internet has gone down due to a sudden zombie invasion.  It main\r
-    utility is in providing us with authentication support for\r
-    testing.\r
-    """\r
-\r
-    def __init__(self, digest_auth_handler, *args, **kwargs):\r
-        # This has to be set before calling our parent's __init__(), which will\r
-        # try to call do_GET().\r
-        self.digest_auth_handler = digest_auth_handler\r
-        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)\r
-\r
-    def log_message(self, format, *args):\r
-        # Uncomment the next line for debugging.\r
-        #sys.stderr.write(format % args)\r
-        pass\r
-\r
-    def do_GET(self):\r
-        (scm, netloc, path, params, query, fragment) = urlparse.urlparse(\r
-            self.path, 'http')\r
-        self.short_path = path\r
-        if self.digest_auth_handler.handle_request(self):\r
-            self.send_response(200, "OK")\r
-            self.send_header("Content-Type", "text/html")\r
-            self.end_headers()\r
-            self.wfile.write("You've reached %s!<BR>" % self.path)\r
-            self.wfile.write("Our apologies, but our server is down due to "\r
-                              "a sudden zombie invasion.")\r
-\r
-# Test cases\r
-\r
-class BaseTestCase(unittest.TestCase):\r
-    def setUp(self):\r
-        self._threads = test_support.threading_setup()\r
-\r
-    def tearDown(self):\r
-        test_support.threading_cleanup(*self._threads)\r
-\r
-\r
-class ProxyAuthTests(BaseTestCase):\r
-    URL = "http://localhost"\r
-\r
-    USER = "tester"\r
-    PASSWD = "test123"\r
-    REALM = "TestRealm"\r
-\r
-    def setUp(self):\r
-        super(ProxyAuthTests, self).setUp()\r
-        self.digest_auth_handler = DigestAuthHandler()\r
-        self.digest_auth_handler.set_users({self.USER: self.PASSWD})\r
-        self.digest_auth_handler.set_realm(self.REALM)\r
-        def create_fake_proxy_handler(*args, **kwargs):\r
-            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)\r
-\r
-        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)\r
-        self.server.start()\r
-        self.server.ready.wait()\r
-        proxy_url = "http://127.0.0.1:%d" % self.server.port\r
-        handler = urllib2.ProxyHandler({"http" : proxy_url})\r
-        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()\r
-        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)\r
-\r
-    def tearDown(self):\r
-        self.server.stop()\r
-        super(ProxyAuthTests, self).tearDown()\r
-\r
-    def test_proxy_with_bad_password_raises_httperror(self):\r
-        self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
-                                               self.USER, self.PASSWD+"bad")\r
-        self.digest_auth_handler.set_qop("auth")\r
-        self.assertRaises(urllib2.HTTPError,\r
-                          self.opener.open,\r
-                          self.URL)\r
-\r
-    def test_proxy_with_no_password_raises_httperror(self):\r
-        self.digest_auth_handler.set_qop("auth")\r
-        self.assertRaises(urllib2.HTTPError,\r
-                          self.opener.open,\r
-                          self.URL)\r
-\r
-    def test_proxy_qop_auth_works(self):\r
-        self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
-                                               self.USER, self.PASSWD)\r
-        self.digest_auth_handler.set_qop("auth")\r
-        result = self.opener.open(self.URL)\r
-        while result.read():\r
-            pass\r
-        result.close()\r
-\r
-    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):\r
-        self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
-                                               self.USER, self.PASSWD)\r
-        self.digest_auth_handler.set_qop("auth-int")\r
-        try:\r
-            result = self.opener.open(self.URL)\r
-        except urllib2.URLError:\r
-            # It's okay if we don't support auth-int, but we certainly\r
-            # shouldn't receive any kind of exception here other than\r
-            # a URLError.\r
-            result = None\r
-        if result:\r
-            while result.read():\r
-                pass\r
-            result.close()\r
-\r
-\r
-def GetRequestHandler(responses):\r
-\r
-    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):\r
-\r
-        server_version = "TestHTTP/"\r
-        requests = []\r
-        headers_received = []\r
-        port = 80\r
-\r
-        def do_GET(self):\r
-            body = self.send_head()\r
-            if body:\r
-                self.wfile.write(body)\r
-\r
-        def do_POST(self):\r
-            content_length = self.headers['Content-Length']\r
-            post_data = self.rfile.read(int(content_length))\r
-            self.do_GET()\r
-            self.requests.append(post_data)\r
-\r
-        def send_head(self):\r
-            FakeHTTPRequestHandler.headers_received = self.headers\r
-            self.requests.append(self.path)\r
-            response_code, headers, body = responses.pop(0)\r
-\r
-            self.send_response(response_code)\r
-\r
-            for (header, value) in headers:\r
-                self.send_header(header, value % self.port)\r
-            if body:\r
-                self.send_header('Content-type', 'text/plain')\r
-                self.end_headers()\r
-                return body\r
-            self.end_headers()\r
-\r
-        def log_message(self, *args):\r
-            pass\r
-\r
-\r
-    return FakeHTTPRequestHandler\r
-\r
-\r
-class TestUrlopen(BaseTestCase):\r
-    """Tests urllib2.urlopen using the network.\r
-\r
-    These tests are not exhaustive.  Assuming that testing using files does a\r
-    good job overall of some of the basic interface features.  There are no\r
-    tests exercising the optional 'data' and 'proxies' arguments.  No tests\r
-    for transparent redirection have been written.\r
-    """\r
-\r
-    def start_server(self, responses):\r
-        handler = GetRequestHandler(responses)\r
-\r
-        self.server = LoopbackHttpServerThread(handler)\r
-        self.server.start()\r
-        self.server.ready.wait()\r
-        port = self.server.port\r
-        handler.port = port\r
-        return handler\r
-\r
-\r
-    def test_redirection(self):\r
-        expected_response = 'We got here...'\r
-        responses = [\r
-            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),\r
-            (200, [], expected_response)\r
-        ]\r
-\r
-        handler = self.start_server(responses)\r
-\r
-        try:\r
-            f = urllib2.urlopen('http://localhost:%s/' % handler.port)\r
-            data = f.read()\r
-            f.close()\r
-\r
-            self.assertEqual(data, expected_response)\r
-            self.assertEqual(handler.requests, ['/', '/somewhere_else'])\r
-        finally:\r
-            self.server.stop()\r
-\r
-\r
-    def test_404(self):\r
-        expected_response = 'Bad bad bad...'\r
-        handler = self.start_server([(404, [], expected_response)])\r
-\r
-        try:\r
-            try:\r
-                urllib2.urlopen('http://localhost:%s/weeble' % handler.port)\r
-            except urllib2.URLError, f:\r
-                pass\r
-            else:\r
-                self.fail('404 should raise URLError')\r
-\r
-            data = f.read()\r
-            f.close()\r
-\r
-            self.assertEqual(data, expected_response)\r
-            self.assertEqual(handler.requests, ['/weeble'])\r
-        finally:\r
-            self.server.stop()\r
-\r
-\r
-    def test_200(self):\r
-        expected_response = 'pycon 2008...'\r
-        handler = self.start_server([(200, [], expected_response)])\r
-\r
-        try:\r
-            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)\r
-            data = f.read()\r
-            f.close()\r
-\r
-            self.assertEqual(data, expected_response)\r
-            self.assertEqual(handler.requests, ['/bizarre'])\r
-        finally:\r
-            self.server.stop()\r
-\r
-    def test_200_with_parameters(self):\r
-        expected_response = 'pycon 2008...'\r
-        handler = self.start_server([(200, [], expected_response)])\r
-\r
-        try:\r
-            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')\r
-            data = f.read()\r
-            f.close()\r
-\r
-            self.assertEqual(data, expected_response)\r
-            self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])\r
-        finally:\r
-            self.server.stop()\r
-\r
-\r
-    def test_sending_headers(self):\r
-        handler = self.start_server([(200, [], "we don't care")])\r
-\r
-        try:\r
-            req = urllib2.Request("http://localhost:%s/" % handler.port,\r
-                                  headers={'Range': 'bytes=20-39'})\r
-            urllib2.urlopen(req)\r
-            self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')\r
-        finally:\r
-            self.server.stop()\r
-\r
-    def test_basic(self):\r
-        handler = self.start_server([(200, [], "we don't care")])\r
-\r
-        try:\r
-            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
-            for attr in ("read", "close", "info", "geturl"):\r
-                self.assertTrue(hasattr(open_url, attr), "object returned from "\r
-                             "urlopen lacks the %s attribute" % attr)\r
-            try:\r
-                self.assertTrue(open_url.read(), "calling 'read' failed")\r
-            finally:\r
-                open_url.close()\r
-        finally:\r
-            self.server.stop()\r
-\r
-    def test_info(self):\r
-        handler = self.start_server([(200, [], "we don't care")])\r
-\r
-        try:\r
-            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
-            info_obj = open_url.info()\r
-            self.assertIsInstance(info_obj, mimetools.Message,\r
-                                  "object returned by 'info' is not an "\r
-                                  "instance of mimetools.Message")\r
-            self.assertEqual(info_obj.getsubtype(), "plain")\r
-        finally:\r
-            self.server.stop()\r
-\r
-    def test_geturl(self):\r
-        # Make sure same URL as opened is returned by geturl.\r
-        handler = self.start_server([(200, [], "we don't care")])\r
-\r
-        try:\r
-            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
-            url = open_url.geturl()\r
-            self.assertEqual(url, "http://localhost:%s" % handler.port)\r
-        finally:\r
-            self.server.stop()\r
-\r
-\r
-    def test_bad_address(self):\r
-        # Make sure proper exception is raised when connecting to a bogus\r
-        # address.\r
-        self.assertRaises(IOError,\r
-                          # Given that both VeriSign and various ISPs have in\r
-                          # the past or are presently hijacking various invalid\r
-                          # domain name requests in an attempt to boost traffic\r
-                          # to their own sites, finding a domain name to use\r
-                          # for this test is difficult.  RFC2606 leads one to\r
-                          # believe that '.invalid' should work, but experience\r
-                          # seemed to indicate otherwise.  Single character\r
-                          # TLDs are likely to remain invalid, so this seems to\r
-                          # be the best choice. The trailing '.' prevents a\r
-                          # related problem: The normal DNS resolver appends\r
-                          # the domain names from the search path if there is\r
-                          # no '.' the end and, and if one of those domains\r
-                          # implements a '*' rule a result is returned.\r
-                          # However, none of this will prevent the test from\r
-                          # failing if the ISP hijacks all invalid domain\r
-                          # requests.  The real solution would be to be able to\r
-                          # parameterize the framework with a mock resolver.\r
-                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")\r
-\r
-    def test_iteration(self):\r
-        expected_response = "pycon 2008..."\r
-        handler = self.start_server([(200, [], expected_response)])\r
-        try:\r
-            data = urllib2.urlopen("http://localhost:%s" % handler.port)\r
-            for line in data:\r
-                self.assertEqual(line, expected_response)\r
-        finally:\r
-            self.server.stop()\r
-\r
-    def ztest_line_iteration(self):\r
-        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]\r
-        expected_response = "".join(lines)\r
-        handler = self.start_server([(200, [], expected_response)])\r
-        try:\r
-            data = urllib2.urlopen("http://localhost:%s" % handler.port)\r
-            for index, line in enumerate(data):\r
-                self.assertEqual(line, lines[index],\r
-                                 "Fetched line number %s doesn't match expected:\n"\r
-                                 "    Expected length was %s, got %s" %\r
-                                 (index, len(lines[index]), len(line)))\r
-        finally:\r
-            self.server.stop()\r
-        self.assertEqual(index + 1, len(lines))\r
-\r
-def test_main():\r
-    # We will NOT depend on the network resource flag\r
-    # (Lib/test/regrtest.py -u network) since all tests here are only\r
-    # localhost.  However, if this is a bad rationale, then uncomment\r
-    # the next line.\r
-    #test_support.requires("network")\r
-\r
-    test_support.run_unittest(ProxyAuthTests, TestUrlopen)\r
-\r
-if __name__ == "__main__":\r
-    test_main()\r