]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_urllib2_localnet.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_urllib2_localnet.py
1 #!/usr/bin/env python
2
3 import urlparse
4 import urllib2
5 import BaseHTTPServer
6 import unittest
7 import hashlib
8 from test import test_support
9 mimetools = test_support.import_module('mimetools', deprecated=True)
10 threading = test_support.import_module('threading')
11
12 # Loopback http server infrastructure
13
14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
23
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
30
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40 class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
43 def __init__(self, request_handler):
44 threading.Thread.__init__(self)
45 self._stop = False
46 self.ready = threading.Event()
47 request_handler.protocol_version = "HTTP/1.0"
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
53
54 def stop(self):
55 """Stops the webserver if it's currently running."""
56
57 # Set the stop flag.
58 self._stop = True
59
60 self.join()
61
62 def run(self):
63 self.ready.set()
64 while not self._stop:
65 self.httpd.handle_request()
66
67 # Authentication infrastructure
68
69 class DigestAuthHandler:
70 """Handler for performing digest authentication."""
71
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
78
79 def set_qop(self, qop):
80 self._qop = qop
81
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
85
86 def set_realm(self, realm):
87 self._realm_name = realm
88
89 def _generate_nonce(self):
90 self._request_num += 1
91 nonce = hashlib.md5(str(self._request_num)).hexdigest()
92 self._nonces.append(nonce)
93 return nonce
94
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
98
99 parts = auth_str.split(",")
100
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
111
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1 = hashlib.md5(HA1_str).hexdigest()
120 HA2_str = "%(method)s:%(uri)s" % final_dict
121 HA2 = hashlib.md5(HA2_str).hexdigest()
122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response = hashlib.md5(response_str).hexdigest()
127
128 return response == auth_dict["response"]
129
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
142 request_handler.wfile.write("Proxy Authentication Required.")
143 return False
144
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
149
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
152 """
153
154 if len(self._users) == 0:
155 return True
156
157 if 'Proxy-Authorization' not in request_handler.headers:
158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
161 request_handler.headers['Proxy-Authorization']
162 )
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
171
172 auth_validated = False
173
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
177
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
184
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
188
189 # Proxy test infrastructure
190
191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
196 """
197
198 def __init__(self, digest_auth_handler, *args, **kwargs):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self.digest_auth_handler = digest_auth_handler
202 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
203
204 def log_message(self, format, *args):
205 # Uncomment the next line for debugging.
206 #sys.stderr.write(format % args)
207 pass
208
209 def do_GET(self):
210 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
211 self.path, 'http')
212 self.short_path = path
213 if self.digest_auth_handler.handle_request(self):
214 self.send_response(200, "OK")
215 self.send_header("Content-Type", "text/html")
216 self.end_headers()
217 self.wfile.write("You've reached %s!<BR>" % self.path)
218 self.wfile.write("Our apologies, but our server is down due to "
219 "a sudden zombie invasion.")
220
221 # Test cases
222
223 class BaseTestCase(unittest.TestCase):
224 def setUp(self):
225 self._threads = test_support.threading_setup()
226
227 def tearDown(self):
228 test_support.threading_cleanup(*self._threads)
229
230
231 class ProxyAuthTests(BaseTestCase):
232 URL = "http://localhost"
233
234 USER = "tester"
235 PASSWD = "test123"
236 REALM = "TestRealm"
237
238 def setUp(self):
239 super(ProxyAuthTests, self).setUp()
240 self.digest_auth_handler = DigestAuthHandler()
241 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
242 self.digest_auth_handler.set_realm(self.REALM)
243 def create_fake_proxy_handler(*args, **kwargs):
244 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
245
246 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
247 self.server.start()
248 self.server.ready.wait()
249 proxy_url = "http://127.0.0.1:%d" % self.server.port
250 handler = urllib2.ProxyHandler({"http" : proxy_url})
251 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
252 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
253
254 def tearDown(self):
255 self.server.stop()
256 super(ProxyAuthTests, self).tearDown()
257
258 def test_proxy_with_bad_password_raises_httperror(self):
259 self.proxy_digest_handler.add_password(self.REALM, self.URL,
260 self.USER, self.PASSWD+"bad")
261 self.digest_auth_handler.set_qop("auth")
262 self.assertRaises(urllib2.HTTPError,
263 self.opener.open,
264 self.URL)
265
266 def test_proxy_with_no_password_raises_httperror(self):
267 self.digest_auth_handler.set_qop("auth")
268 self.assertRaises(urllib2.HTTPError,
269 self.opener.open,
270 self.URL)
271
272 def test_proxy_qop_auth_works(self):
273 self.proxy_digest_handler.add_password(self.REALM, self.URL,
274 self.USER, self.PASSWD)
275 self.digest_auth_handler.set_qop("auth")
276 result = self.opener.open(self.URL)
277 while result.read():
278 pass
279 result.close()
280
281 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
282 self.proxy_digest_handler.add_password(self.REALM, self.URL,
283 self.USER, self.PASSWD)
284 self.digest_auth_handler.set_qop("auth-int")
285 try:
286 result = self.opener.open(self.URL)
287 except urllib2.URLError:
288 # It's okay if we don't support auth-int, but we certainly
289 # shouldn't receive any kind of exception here other than
290 # a URLError.
291 result = None
292 if result:
293 while result.read():
294 pass
295 result.close()
296
297
298 def GetRequestHandler(responses):
299
300 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
301
302 server_version = "TestHTTP/"
303 requests = []
304 headers_received = []
305 port = 80
306
307 def do_GET(self):
308 body = self.send_head()
309 if body:
310 self.wfile.write(body)
311
312 def do_POST(self):
313 content_length = self.headers['Content-Length']
314 post_data = self.rfile.read(int(content_length))
315 self.do_GET()
316 self.requests.append(post_data)
317
318 def send_head(self):
319 FakeHTTPRequestHandler.headers_received = self.headers
320 self.requests.append(self.path)
321 response_code, headers, body = responses.pop(0)
322
323 self.send_response(response_code)
324
325 for (header, value) in headers:
326 self.send_header(header, value % self.port)
327 if body:
328 self.send_header('Content-type', 'text/plain')
329 self.end_headers()
330 return body
331 self.end_headers()
332
333 def log_message(self, *args):
334 pass
335
336
337 return FakeHTTPRequestHandler
338
339
340 class TestUrlopen(BaseTestCase):
341 """Tests urllib2.urlopen using the network.
342
343 These tests are not exhaustive. Assuming that testing using files does a
344 good job overall of some of the basic interface features. There are no
345 tests exercising the optional 'data' and 'proxies' arguments. No tests
346 for transparent redirection have been written.
347 """
348
349 def start_server(self, responses):
350 handler = GetRequestHandler(responses)
351
352 self.server = LoopbackHttpServerThread(handler)
353 self.server.start()
354 self.server.ready.wait()
355 port = self.server.port
356 handler.port = port
357 return handler
358
359
360 def test_redirection(self):
361 expected_response = 'We got here...'
362 responses = [
363 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
364 (200, [], expected_response)
365 ]
366
367 handler = self.start_server(responses)
368
369 try:
370 f = urllib2.urlopen('http://localhost:%s/' % handler.port)
371 data = f.read()
372 f.close()
373
374 self.assertEqual(data, expected_response)
375 self.assertEqual(handler.requests, ['/', '/somewhere_else'])
376 finally:
377 self.server.stop()
378
379
380 def test_404(self):
381 expected_response = 'Bad bad bad...'
382 handler = self.start_server([(404, [], expected_response)])
383
384 try:
385 try:
386 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
387 except urllib2.URLError, f:
388 pass
389 else:
390 self.fail('404 should raise URLError')
391
392 data = f.read()
393 f.close()
394
395 self.assertEqual(data, expected_response)
396 self.assertEqual(handler.requests, ['/weeble'])
397 finally:
398 self.server.stop()
399
400
401 def test_200(self):
402 expected_response = 'pycon 2008...'
403 handler = self.start_server([(200, [], expected_response)])
404
405 try:
406 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
407 data = f.read()
408 f.close()
409
410 self.assertEqual(data, expected_response)
411 self.assertEqual(handler.requests, ['/bizarre'])
412 finally:
413 self.server.stop()
414
415 def test_200_with_parameters(self):
416 expected_response = 'pycon 2008...'
417 handler = self.start_server([(200, [], expected_response)])
418
419 try:
420 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
421 data = f.read()
422 f.close()
423
424 self.assertEqual(data, expected_response)
425 self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
426 finally:
427 self.server.stop()
428
429
430 def test_sending_headers(self):
431 handler = self.start_server([(200, [], "we don't care")])
432
433 try:
434 req = urllib2.Request("http://localhost:%s/" % handler.port,
435 headers={'Range': 'bytes=20-39'})
436 urllib2.urlopen(req)
437 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
438 finally:
439 self.server.stop()
440
441 def test_basic(self):
442 handler = self.start_server([(200, [], "we don't care")])
443
444 try:
445 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
446 for attr in ("read", "close", "info", "geturl"):
447 self.assertTrue(hasattr(open_url, attr), "object returned from "
448 "urlopen lacks the %s attribute" % attr)
449 try:
450 self.assertTrue(open_url.read(), "calling 'read' failed")
451 finally:
452 open_url.close()
453 finally:
454 self.server.stop()
455
456 def test_info(self):
457 handler = self.start_server([(200, [], "we don't care")])
458
459 try:
460 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
461 info_obj = open_url.info()
462 self.assertIsInstance(info_obj, mimetools.Message,
463 "object returned by 'info' is not an "
464 "instance of mimetools.Message")
465 self.assertEqual(info_obj.getsubtype(), "plain")
466 finally:
467 self.server.stop()
468
469 def test_geturl(self):
470 # Make sure same URL as opened is returned by geturl.
471 handler = self.start_server([(200, [], "we don't care")])
472
473 try:
474 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
475 url = open_url.geturl()
476 self.assertEqual(url, "http://localhost:%s" % handler.port)
477 finally:
478 self.server.stop()
479
480
481 def test_bad_address(self):
482 # Make sure proper exception is raised when connecting to a bogus
483 # address.
484 self.assertRaises(IOError,
485 # Given that both VeriSign and various ISPs have in
486 # the past or are presently hijacking various invalid
487 # domain name requests in an attempt to boost traffic
488 # to their own sites, finding a domain name to use
489 # for this test is difficult. RFC2606 leads one to
490 # believe that '.invalid' should work, but experience
491 # seemed to indicate otherwise. Single character
492 # TLDs are likely to remain invalid, so this seems to
493 # be the best choice. The trailing '.' prevents a
494 # related problem: The normal DNS resolver appends
495 # the domain names from the search path if there is
496 # no '.' the end and, and if one of those domains
497 # implements a '*' rule a result is returned.
498 # However, none of this will prevent the test from
499 # failing if the ISP hijacks all invalid domain
500 # requests. The real solution would be to be able to
501 # parameterize the framework with a mock resolver.
502 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
503
504 def test_iteration(self):
505 expected_response = "pycon 2008..."
506 handler = self.start_server([(200, [], expected_response)])
507 try:
508 data = urllib2.urlopen("http://localhost:%s" % handler.port)
509 for line in data:
510 self.assertEqual(line, expected_response)
511 finally:
512 self.server.stop()
513
514 def ztest_line_iteration(self):
515 lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
516 expected_response = "".join(lines)
517 handler = self.start_server([(200, [], expected_response)])
518 try:
519 data = urllib2.urlopen("http://localhost:%s" % handler.port)
520 for index, line in enumerate(data):
521 self.assertEqual(line, lines[index],
522 "Fetched line number %s doesn't match expected:\n"
523 " Expected length was %s, got %s" %
524 (index, len(lines[index]), len(line)))
525 finally:
526 self.server.stop()
527 self.assertEqual(index + 1, len(lines))
528
529 def test_main():
530 # We will NOT depend on the network resource flag
531 # (Lib/test/regrtest.py -u network) since all tests here are only
532 # localhost. However, if this is a bad rationale, then uncomment
533 # the next line.
534 #test_support.requires("network")
535
536 test_support.run_unittest(ProxyAuthTests, TestUrlopen)
537
538 if __name__ == "__main__":
539 test_main()