+++ /dev/null
-#!/usr/bin/env python\r
-\r
-import urlparse\r
-import urllib2\r
-import BaseHTTPServer\r
-import unittest\r
-import hashlib\r
-from test import test_support\r
-mimetools = test_support.import_module('mimetools', deprecated=True)\r
-threading = test_support.import_module('threading')\r
-\r
-# Loopback http server infrastructure\r
-\r
-class LoopbackHttpServer(BaseHTTPServer.HTTPServer):\r
- """HTTP server w/ a few modifications that make it useful for\r
- loopback testing purposes.\r
- """\r
-\r
- def __init__(self, server_address, RequestHandlerClass):\r
- BaseHTTPServer.HTTPServer.__init__(self,\r
- server_address,\r
- RequestHandlerClass)\r
-\r
- # Set the timeout of our listening socket really low so\r
- # that we can stop the server easily.\r
- self.socket.settimeout(1.0)\r
-\r
- def get_request(self):\r
- """BaseHTTPServer method, overridden."""\r
-\r
- request, client_address = self.socket.accept()\r
-\r
- # It's a loopback connection, so setting the timeout\r
- # really low shouldn't affect anything, but should make\r
- # deadlocks less likely to occur.\r
- request.settimeout(10.0)\r
-\r
- return (request, client_address)\r
-\r
-class LoopbackHttpServerThread(threading.Thread):\r
- """Stoppable thread that runs a loopback http server."""\r
-\r
- def __init__(self, request_handler):\r
- threading.Thread.__init__(self)\r
- self._stop = False\r
- self.ready = threading.Event()\r
- request_handler.protocol_version = "HTTP/1.0"\r
- self.httpd = LoopbackHttpServer(('127.0.0.1', 0),\r
- request_handler)\r
- #print "Serving HTTP on %s port %s" % (self.httpd.server_name,\r
- # self.httpd.server_port)\r
- self.port = self.httpd.server_port\r
-\r
- def stop(self):\r
- """Stops the webserver if it's currently running."""\r
-\r
- # Set the stop flag.\r
- self._stop = True\r
-\r
- self.join()\r
-\r
- def run(self):\r
- self.ready.set()\r
- while not self._stop:\r
- self.httpd.handle_request()\r
-\r
-# Authentication infrastructure\r
-\r
-class DigestAuthHandler:\r
- """Handler for performing digest authentication."""\r
-\r
- def __init__(self):\r
- self._request_num = 0\r
- self._nonces = []\r
- self._users = {}\r
- self._realm_name = "Test Realm"\r
- self._qop = "auth"\r
-\r
- def set_qop(self, qop):\r
- self._qop = qop\r
-\r
- def set_users(self, users):\r
- assert isinstance(users, dict)\r
- self._users = users\r
-\r
- def set_realm(self, realm):\r
- self._realm_name = realm\r
-\r
- def _generate_nonce(self):\r
- self._request_num += 1\r
- nonce = hashlib.md5(str(self._request_num)).hexdigest()\r
- self._nonces.append(nonce)\r
- return nonce\r
-\r
- def _create_auth_dict(self, auth_str):\r
- first_space_index = auth_str.find(" ")\r
- auth_str = auth_str[first_space_index+1:]\r
-\r
- parts = auth_str.split(",")\r
-\r
- auth_dict = {}\r
- for part in parts:\r
- name, value = part.split("=")\r
- name = name.strip()\r
- if value[0] == '"' and value[-1] == '"':\r
- value = value[1:-1]\r
- else:\r
- value = value.strip()\r
- auth_dict[name] = value\r
- return auth_dict\r
-\r
- def _validate_auth(self, auth_dict, password, method, uri):\r
- final_dict = {}\r
- final_dict.update(auth_dict)\r
- final_dict["password"] = password\r
- final_dict["method"] = method\r
- final_dict["uri"] = uri\r
- HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict\r
- HA1 = hashlib.md5(HA1_str).hexdigest()\r
- HA2_str = "%(method)s:%(uri)s" % final_dict\r
- HA2 = hashlib.md5(HA2_str).hexdigest()\r
- final_dict["HA1"] = HA1\r
- final_dict["HA2"] = HA2\r
- response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \\r
- "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict\r
- response = hashlib.md5(response_str).hexdigest()\r
-\r
- return response == auth_dict["response"]\r
-\r
- def _return_auth_challenge(self, request_handler):\r
- request_handler.send_response(407, "Proxy Authentication Required")\r
- request_handler.send_header("Content-Type", "text/html")\r
- request_handler.send_header(\r
- 'Proxy-Authenticate', 'Digest realm="%s", '\r
- 'qop="%s",'\r
- 'nonce="%s", ' % \\r
- (self._realm_name, self._qop, self._generate_nonce()))\r
- # XXX: Not sure if we're supposed to add this next header or\r
- # not.\r
- #request_handler.send_header('Connection', 'close')\r
- request_handler.end_headers()\r
- request_handler.wfile.write("Proxy Authentication Required.")\r
- return False\r
-\r
- def handle_request(self, request_handler):\r
- """Performs digest authentication on the given HTTP request\r
- handler. Returns True if authentication was successful, False\r
- otherwise.\r
-\r
- If no users have been set, then digest auth is effectively\r
- disabled and this method will always return True.\r
- """\r
-\r
- if len(self._users) == 0:\r
- return True\r
-\r
- if 'Proxy-Authorization' not in request_handler.headers:\r
- return self._return_auth_challenge(request_handler)\r
- else:\r
- auth_dict = self._create_auth_dict(\r
- request_handler.headers['Proxy-Authorization']\r
- )\r
- if auth_dict["username"] in self._users:\r
- password = self._users[ auth_dict["username"] ]\r
- else:\r
- return self._return_auth_challenge(request_handler)\r
- if not auth_dict.get("nonce") in self._nonces:\r
- return self._return_auth_challenge(request_handler)\r
- else:\r
- self._nonces.remove(auth_dict["nonce"])\r
-\r
- auth_validated = False\r
-\r
- # MSIE uses short_path in its validation, but Python's\r
- # urllib2 uses the full path, so we're going to see if\r
- # either of them works here.\r
-\r
- for path in [request_handler.path, request_handler.short_path]:\r
- if self._validate_auth(auth_dict,\r
- password,\r
- request_handler.command,\r
- path):\r
- auth_validated = True\r
-\r
- if not auth_validated:\r
- return self._return_auth_challenge(request_handler)\r
- return True\r
-\r
-# Proxy test infrastructure\r
-\r
-class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):\r
- """This is a 'fake proxy' that makes it look like the entire\r
- internet has gone down due to a sudden zombie invasion. It main\r
- utility is in providing us with authentication support for\r
- testing.\r
- """\r
-\r
- def __init__(self, digest_auth_handler, *args, **kwargs):\r
- # This has to be set before calling our parent's __init__(), which will\r
- # try to call do_GET().\r
- self.digest_auth_handler = digest_auth_handler\r
- BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)\r
-\r
- def log_message(self, format, *args):\r
- # Uncomment the next line for debugging.\r
- #sys.stderr.write(format % args)\r
- pass\r
-\r
- def do_GET(self):\r
- (scm, netloc, path, params, query, fragment) = urlparse.urlparse(\r
- self.path, 'http')\r
- self.short_path = path\r
- if self.digest_auth_handler.handle_request(self):\r
- self.send_response(200, "OK")\r
- self.send_header("Content-Type", "text/html")\r
- self.end_headers()\r
- self.wfile.write("You've reached %s!<BR>" % self.path)\r
- self.wfile.write("Our apologies, but our server is down due to "\r
- "a sudden zombie invasion.")\r
-\r
-# Test cases\r
-\r
-class BaseTestCase(unittest.TestCase):\r
- def setUp(self):\r
- self._threads = test_support.threading_setup()\r
-\r
- def tearDown(self):\r
- test_support.threading_cleanup(*self._threads)\r
-\r
-\r
-class ProxyAuthTests(BaseTestCase):\r
- URL = "http://localhost"\r
-\r
- USER = "tester"\r
- PASSWD = "test123"\r
- REALM = "TestRealm"\r
-\r
- def setUp(self):\r
- super(ProxyAuthTests, self).setUp()\r
- self.digest_auth_handler = DigestAuthHandler()\r
- self.digest_auth_handler.set_users({self.USER: self.PASSWD})\r
- self.digest_auth_handler.set_realm(self.REALM)\r
- def create_fake_proxy_handler(*args, **kwargs):\r
- return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)\r
-\r
- self.server = LoopbackHttpServerThread(create_fake_proxy_handler)\r
- self.server.start()\r
- self.server.ready.wait()\r
- proxy_url = "http://127.0.0.1:%d" % self.server.port\r
- handler = urllib2.ProxyHandler({"http" : proxy_url})\r
- self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()\r
- self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)\r
-\r
- def tearDown(self):\r
- self.server.stop()\r
- super(ProxyAuthTests, self).tearDown()\r
-\r
- def test_proxy_with_bad_password_raises_httperror(self):\r
- self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
- self.USER, self.PASSWD+"bad")\r
- self.digest_auth_handler.set_qop("auth")\r
- self.assertRaises(urllib2.HTTPError,\r
- self.opener.open,\r
- self.URL)\r
-\r
- def test_proxy_with_no_password_raises_httperror(self):\r
- self.digest_auth_handler.set_qop("auth")\r
- self.assertRaises(urllib2.HTTPError,\r
- self.opener.open,\r
- self.URL)\r
-\r
- def test_proxy_qop_auth_works(self):\r
- self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
- self.USER, self.PASSWD)\r
- self.digest_auth_handler.set_qop("auth")\r
- result = self.opener.open(self.URL)\r
- while result.read():\r
- pass\r
- result.close()\r
-\r
- def test_proxy_qop_auth_int_works_or_throws_urlerror(self):\r
- self.proxy_digest_handler.add_password(self.REALM, self.URL,\r
- self.USER, self.PASSWD)\r
- self.digest_auth_handler.set_qop("auth-int")\r
- try:\r
- result = self.opener.open(self.URL)\r
- except urllib2.URLError:\r
- # It's okay if we don't support auth-int, but we certainly\r
- # shouldn't receive any kind of exception here other than\r
- # a URLError.\r
- result = None\r
- if result:\r
- while result.read():\r
- pass\r
- result.close()\r
-\r
-\r
-def GetRequestHandler(responses):\r
-\r
- class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):\r
-\r
- server_version = "TestHTTP/"\r
- requests = []\r
- headers_received = []\r
- port = 80\r
-\r
- def do_GET(self):\r
- body = self.send_head()\r
- if body:\r
- self.wfile.write(body)\r
-\r
- def do_POST(self):\r
- content_length = self.headers['Content-Length']\r
- post_data = self.rfile.read(int(content_length))\r
- self.do_GET()\r
- self.requests.append(post_data)\r
-\r
- def send_head(self):\r
- FakeHTTPRequestHandler.headers_received = self.headers\r
- self.requests.append(self.path)\r
- response_code, headers, body = responses.pop(0)\r
-\r
- self.send_response(response_code)\r
-\r
- for (header, value) in headers:\r
- self.send_header(header, value % self.port)\r
- if body:\r
- self.send_header('Content-type', 'text/plain')\r
- self.end_headers()\r
- return body\r
- self.end_headers()\r
-\r
- def log_message(self, *args):\r
- pass\r
-\r
-\r
- return FakeHTTPRequestHandler\r
-\r
-\r
-class TestUrlopen(BaseTestCase):\r
- """Tests urllib2.urlopen using the network.\r
-\r
- These tests are not exhaustive. Assuming that testing using files does a\r
- good job overall of some of the basic interface features. There are no\r
- tests exercising the optional 'data' and 'proxies' arguments. No tests\r
- for transparent redirection have been written.\r
- """\r
-\r
- def start_server(self, responses):\r
- handler = GetRequestHandler(responses)\r
-\r
- self.server = LoopbackHttpServerThread(handler)\r
- self.server.start()\r
- self.server.ready.wait()\r
- port = self.server.port\r
- handler.port = port\r
- return handler\r
-\r
-\r
- def test_redirection(self):\r
- expected_response = 'We got here...'\r
- responses = [\r
- (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),\r
- (200, [], expected_response)\r
- ]\r
-\r
- handler = self.start_server(responses)\r
-\r
- try:\r
- f = urllib2.urlopen('http://localhost:%s/' % handler.port)\r
- data = f.read()\r
- f.close()\r
-\r
- self.assertEqual(data, expected_response)\r
- self.assertEqual(handler.requests, ['/', '/somewhere_else'])\r
- finally:\r
- self.server.stop()\r
-\r
-\r
- def test_404(self):\r
- expected_response = 'Bad bad bad...'\r
- handler = self.start_server([(404, [], expected_response)])\r
-\r
- try:\r
- try:\r
- urllib2.urlopen('http://localhost:%s/weeble' % handler.port)\r
- except urllib2.URLError, f:\r
- pass\r
- else:\r
- self.fail('404 should raise URLError')\r
-\r
- data = f.read()\r
- f.close()\r
-\r
- self.assertEqual(data, expected_response)\r
- self.assertEqual(handler.requests, ['/weeble'])\r
- finally:\r
- self.server.stop()\r
-\r
-\r
- def test_200(self):\r
- expected_response = 'pycon 2008...'\r
- handler = self.start_server([(200, [], expected_response)])\r
-\r
- try:\r
- f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)\r
- data = f.read()\r
- f.close()\r
-\r
- self.assertEqual(data, expected_response)\r
- self.assertEqual(handler.requests, ['/bizarre'])\r
- finally:\r
- self.server.stop()\r
-\r
- def test_200_with_parameters(self):\r
- expected_response = 'pycon 2008...'\r
- handler = self.start_server([(200, [], expected_response)])\r
-\r
- try:\r
- f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')\r
- data = f.read()\r
- f.close()\r
-\r
- self.assertEqual(data, expected_response)\r
- self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])\r
- finally:\r
- self.server.stop()\r
-\r
-\r
- def test_sending_headers(self):\r
- handler = self.start_server([(200, [], "we don't care")])\r
-\r
- try:\r
- req = urllib2.Request("http://localhost:%s/" % handler.port,\r
- headers={'Range': 'bytes=20-39'})\r
- urllib2.urlopen(req)\r
- self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')\r
- finally:\r
- self.server.stop()\r
-\r
- def test_basic(self):\r
- handler = self.start_server([(200, [], "we don't care")])\r
-\r
- try:\r
- open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
- for attr in ("read", "close", "info", "geturl"):\r
- self.assertTrue(hasattr(open_url, attr), "object returned from "\r
- "urlopen lacks the %s attribute" % attr)\r
- try:\r
- self.assertTrue(open_url.read(), "calling 'read' failed")\r
- finally:\r
- open_url.close()\r
- finally:\r
- self.server.stop()\r
-\r
- def test_info(self):\r
- handler = self.start_server([(200, [], "we don't care")])\r
-\r
- try:\r
- open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
- info_obj = open_url.info()\r
- self.assertIsInstance(info_obj, mimetools.Message,\r
- "object returned by 'info' is not an "\r
- "instance of mimetools.Message")\r
- self.assertEqual(info_obj.getsubtype(), "plain")\r
- finally:\r
- self.server.stop()\r
-\r
- def test_geturl(self):\r
- # Make sure same URL as opened is returned by geturl.\r
- handler = self.start_server([(200, [], "we don't care")])\r
-\r
- try:\r
- open_url = urllib2.urlopen("http://localhost:%s" % handler.port)\r
- url = open_url.geturl()\r
- self.assertEqual(url, "http://localhost:%s" % handler.port)\r
- finally:\r
- self.server.stop()\r
-\r
-\r
- def test_bad_address(self):\r
- # Make sure proper exception is raised when connecting to a bogus\r
- # address.\r
- self.assertRaises(IOError,\r
- # Given that both VeriSign and various ISPs have in\r
- # the past or are presently hijacking various invalid\r
- # domain name requests in an attempt to boost traffic\r
- # to their own sites, finding a domain name to use\r
- # for this test is difficult. RFC2606 leads one to\r
- # believe that '.invalid' should work, but experience\r
- # seemed to indicate otherwise. Single character\r
- # TLDs are likely to remain invalid, so this seems to\r
- # be the best choice. The trailing '.' prevents a\r
- # related problem: The normal DNS resolver appends\r
- # the domain names from the search path if there is\r
- # no '.' the end and, and if one of those domains\r
- # implements a '*' rule a result is returned.\r
- # However, none of this will prevent the test from\r
- # failing if the ISP hijacks all invalid domain\r
- # requests. The real solution would be to be able to\r
- # parameterize the framework with a mock resolver.\r
- urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")\r
-\r
- def test_iteration(self):\r
- expected_response = "pycon 2008..."\r
- handler = self.start_server([(200, [], expected_response)])\r
- try:\r
- data = urllib2.urlopen("http://localhost:%s" % handler.port)\r
- for line in data:\r
- self.assertEqual(line, expected_response)\r
- finally:\r
- self.server.stop()\r
-\r
- def ztest_line_iteration(self):\r
- lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]\r
- expected_response = "".join(lines)\r
- handler = self.start_server([(200, [], expected_response)])\r
- try:\r
- data = urllib2.urlopen("http://localhost:%s" % handler.port)\r
- for index, line in enumerate(data):\r
- self.assertEqual(line, lines[index],\r
- "Fetched line number %s doesn't match expected:\n"\r
- " Expected length was %s, got %s" %\r
- (index, len(lines[index]), len(line)))\r
- finally:\r
- self.server.stop()\r
- self.assertEqual(index + 1, len(lines))\r
-\r
-def test_main():\r
- # We will NOT depend on the network resource flag\r
- # (Lib/test/regrtest.py -u network) since all tests here are only\r
- # localhost. However, if this is a bad rationale, then uncomment\r
- # the next line.\r
- #test_support.requires("network")\r
-\r
- test_support.run_unittest(ProxyAuthTests, TestUrlopen)\r
-\r
-if __name__ == "__main__":\r
- test_main()\r