]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_urllib2.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_urllib2.py
CommitLineData
4710c53d 1import unittest\r
2from test import test_support\r
3\r
4import os\r
5import socket\r
6import StringIO\r
7\r
8import urllib2\r
9from urllib2 import Request, OpenerDirector\r
10\r
11# XXX\r
12# Request\r
13# CacheFTPHandler (hard to write)\r
14# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler\r
15\r
16class TrivialTests(unittest.TestCase):\r
17 def test_trivial(self):\r
18 # A couple trivial tests\r
19\r
20 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')\r
21\r
22 # XXX Name hacking to get this to work on Windows.\r
23 fname = os.path.abspath(urllib2.__file__).replace('\\', '/')\r
24\r
25 # And more hacking to get it to work on MacOS. This assumes\r
26 # urllib.pathname2url works, unfortunately...\r
27 if os.name == 'riscos':\r
28 import string\r
29 fname = os.expand(fname)\r
30 fname = fname.translate(string.maketrans("/.", "./"))\r
31\r
32 if os.name == 'nt':\r
33 file_url = "file:///%s" % fname\r
34 else:\r
35 file_url = "file://%s" % fname\r
36\r
37 f = urllib2.urlopen(file_url)\r
38\r
39 buf = f.read()\r
40 f.close()\r
41\r
42 def test_parse_http_list(self):\r
43 tests = [('a,b,c', ['a', 'b', 'c']),\r
44 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),\r
45 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),\r
46 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]\r
47 for string, list in tests:\r
48 self.assertEqual(urllib2.parse_http_list(string), list)\r
49\r
50\r
51def test_request_headers_dict():\r
52 """\r
53 The Request.headers dictionary is not a documented interface. It should\r
54 stay that way, because the complete set of headers are only accessible\r
55 through the .get_header(), .has_header(), .header_items() interface.\r
56 However, .headers pre-dates those methods, and so real code will be using\r
57 the dictionary.\r
58\r
59 The introduction in 2.4 of those methods was a mistake for the same reason:\r
60 code that previously saw all (urllib2 user)-provided headers in .headers\r
61 now sees only a subset (and the function interface is ugly and incomplete).\r
62 A better change would have been to replace .headers dict with a dict\r
63 subclass (or UserDict.DictMixin instance?) that preserved the .headers\r
64 interface and also provided access to the "unredirected" headers. It's\r
65 probably too late to fix that, though.\r
66\r
67\r
68 Check .capitalize() case normalization:\r
69\r
70 >>> url = "http://example.com"\r
71 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]\r
72 'blah'\r
73 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]\r
74 'blah'\r
75\r
76 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,\r
77 but that could be changed in future.\r
78\r
79 """\r
80\r
81def test_request_headers_methods():\r
82 """\r
83 Note the case normalization of header names here, to .capitalize()-case.\r
84 This should be preserved for backwards-compatibility. (In the HTTP case,\r
85 normalization to .title()-case is done by urllib2 before sending headers to\r
86 httplib).\r
87\r
88 >>> url = "http://example.com"\r
89 >>> r = Request(url, headers={"Spam-eggs": "blah"})\r
90 >>> r.has_header("Spam-eggs")\r
91 True\r
92 >>> r.header_items()\r
93 [('Spam-eggs', 'blah')]\r
94 >>> r.add_header("Foo-Bar", "baz")\r
95 >>> items = r.header_items()\r
96 >>> items.sort()\r
97 >>> items\r
98 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]\r
99\r
100 Note that e.g. r.has_header("spam-EggS") is currently False, and\r
101 r.get_header("spam-EggS") returns None, but that could be changed in\r
102 future.\r
103\r
104 >>> r.has_header("Not-there")\r
105 False\r
106 >>> print r.get_header("Not-there")\r
107 None\r
108 >>> r.get_header("Not-there", "default")\r
109 'default'\r
110\r
111 """\r
112\r
113\r
114def test_password_manager(self):\r
115 """\r
116 >>> mgr = urllib2.HTTPPasswordMgr()\r
117 >>> add = mgr.add_password\r
118 >>> add("Some Realm", "http://example.com/", "joe", "password")\r
119 >>> add("Some Realm", "http://example.com/ni", "ni", "ni")\r
120 >>> add("c", "http://example.com/foo", "foo", "ni")\r
121 >>> add("c", "http://example.com/bar", "bar", "nini")\r
122 >>> add("b", "http://example.com/", "first", "blah")\r
123 >>> add("b", "http://example.com/", "second", "spam")\r
124 >>> add("a", "http://example.com", "1", "a")\r
125 >>> add("Some Realm", "http://c.example.com:3128", "3", "c")\r
126 >>> add("Some Realm", "d.example.com", "4", "d")\r
127 >>> add("Some Realm", "e.example.com:3128", "5", "e")\r
128\r
129 >>> mgr.find_user_password("Some Realm", "example.com")\r
130 ('joe', 'password')\r
131 >>> mgr.find_user_password("Some Realm", "http://example.com")\r
132 ('joe', 'password')\r
133 >>> mgr.find_user_password("Some Realm", "http://example.com/")\r
134 ('joe', 'password')\r
135 >>> mgr.find_user_password("Some Realm", "http://example.com/spam")\r
136 ('joe', 'password')\r
137 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")\r
138 ('joe', 'password')\r
139 >>> mgr.find_user_password("c", "http://example.com/foo")\r
140 ('foo', 'ni')\r
141 >>> mgr.find_user_password("c", "http://example.com/bar")\r
142 ('bar', 'nini')\r
143\r
144 Actually, this is really undefined ATM\r
145## Currently, we use the highest-level path where more than one match:\r
146\r
147## >>> mgr.find_user_password("Some Realm", "http://example.com/ni")\r
148## ('joe', 'password')\r
149\r
150 Use latest add_password() in case of conflict:\r
151\r
152 >>> mgr.find_user_password("b", "http://example.com/")\r
153 ('second', 'spam')\r
154\r
155 No special relationship between a.example.com and example.com:\r
156\r
157 >>> mgr.find_user_password("a", "http://example.com/")\r
158 ('1', 'a')\r
159 >>> mgr.find_user_password("a", "http://a.example.com/")\r
160 (None, None)\r
161\r
162 Ports:\r
163\r
164 >>> mgr.find_user_password("Some Realm", "c.example.com")\r
165 (None, None)\r
166 >>> mgr.find_user_password("Some Realm", "c.example.com:3128")\r
167 ('3', 'c')\r
168 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")\r
169 ('3', 'c')\r
170 >>> mgr.find_user_password("Some Realm", "d.example.com")\r
171 ('4', 'd')\r
172 >>> mgr.find_user_password("Some Realm", "e.example.com:3128")\r
173 ('5', 'e')\r
174\r
175 """\r
176 pass\r
177\r
178\r
179def test_password_manager_default_port(self):\r
180 """\r
181 >>> mgr = urllib2.HTTPPasswordMgr()\r
182 >>> add = mgr.add_password\r
183\r
184 The point to note here is that we can't guess the default port if there's\r
185 no scheme. This applies to both add_password and find_user_password.\r
186\r
187 >>> add("f", "http://g.example.com:80", "10", "j")\r
188 >>> add("g", "http://h.example.com", "11", "k")\r
189 >>> add("h", "i.example.com:80", "12", "l")\r
190 >>> add("i", "j.example.com", "13", "m")\r
191 >>> mgr.find_user_password("f", "g.example.com:100")\r
192 (None, None)\r
193 >>> mgr.find_user_password("f", "g.example.com:80")\r
194 ('10', 'j')\r
195 >>> mgr.find_user_password("f", "g.example.com")\r
196 (None, None)\r
197 >>> mgr.find_user_password("f", "http://g.example.com:100")\r
198 (None, None)\r
199 >>> mgr.find_user_password("f", "http://g.example.com:80")\r
200 ('10', 'j')\r
201 >>> mgr.find_user_password("f", "http://g.example.com")\r
202 ('10', 'j')\r
203 >>> mgr.find_user_password("g", "h.example.com")\r
204 ('11', 'k')\r
205 >>> mgr.find_user_password("g", "h.example.com:80")\r
206 ('11', 'k')\r
207 >>> mgr.find_user_password("g", "http://h.example.com:80")\r
208 ('11', 'k')\r
209 >>> mgr.find_user_password("h", "i.example.com")\r
210 (None, None)\r
211 >>> mgr.find_user_password("h", "i.example.com:80")\r
212 ('12', 'l')\r
213 >>> mgr.find_user_password("h", "http://i.example.com:80")\r
214 ('12', 'l')\r
215 >>> mgr.find_user_password("i", "j.example.com")\r
216 ('13', 'm')\r
217 >>> mgr.find_user_password("i", "j.example.com:80")\r
218 (None, None)\r
219 >>> mgr.find_user_password("i", "http://j.example.com")\r
220 ('13', 'm')\r
221 >>> mgr.find_user_password("i", "http://j.example.com:80")\r
222 (None, None)\r
223\r
224 """\r
225\r
226class MockOpener:\r
227 addheaders = []\r
228 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r
229 self.req, self.data, self.timeout = req, data, timeout\r
230 def error(self, proto, *args):\r
231 self.proto, self.args = proto, args\r
232\r
233class MockFile:\r
234 def read(self, count=None): pass\r
235 def readline(self, count=None): pass\r
236 def close(self): pass\r
237\r
238class MockHeaders(dict):\r
239 def getheaders(self, name):\r
240 return self.values()\r
241\r
242class MockResponse(StringIO.StringIO):\r
243 def __init__(self, code, msg, headers, data, url=None):\r
244 StringIO.StringIO.__init__(self, data)\r
245 self.code, self.msg, self.headers, self.url = code, msg, headers, url\r
246 def info(self):\r
247 return self.headers\r
248 def geturl(self):\r
249 return self.url\r
250\r
251class MockCookieJar:\r
252 def add_cookie_header(self, request):\r
253 self.ach_req = request\r
254 def extract_cookies(self, response, request):\r
255 self.ec_req, self.ec_r = request, response\r
256\r
257class FakeMethod:\r
258 def __init__(self, meth_name, action, handle):\r
259 self.meth_name = meth_name\r
260 self.handle = handle\r
261 self.action = action\r
262 def __call__(self, *args):\r
263 return self.handle(self.meth_name, self.action, *args)\r
264\r
265class MockHTTPResponse:\r
266 def __init__(self, fp, msg, status, reason):\r
267 self.fp = fp\r
268 self.msg = msg\r
269 self.status = status\r
270 self.reason = reason\r
271 def read(self):\r
272 return ''\r
273\r
274class MockHTTPClass:\r
275 def __init__(self):\r
276 self.req_headers = []\r
277 self.data = None\r
278 self.raise_on_endheaders = False\r
279 self._tunnel_headers = {}\r
280\r
281 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r
282 self.host = host\r
283 self.timeout = timeout\r
284 return self\r
285\r
286 def set_debuglevel(self, level):\r
287 self.level = level\r
288\r
289 def set_tunnel(self, host, port=None, headers=None):\r
290 self._tunnel_host = host\r
291 self._tunnel_port = port\r
292 if headers:\r
293 self._tunnel_headers = headers\r
294 else:\r
295 self._tunnel_headers.clear()\r
296 def request(self, method, url, body=None, headers=None):\r
297 self.method = method\r
298 self.selector = url\r
299 if headers is not None:\r
300 self.req_headers += headers.items()\r
301 self.req_headers.sort()\r
302 if body:\r
303 self.data = body\r
304 if self.raise_on_endheaders:\r
305 import socket\r
306 raise socket.error()\r
307 def getresponse(self):\r
308 return MockHTTPResponse(MockFile(), {}, 200, "OK")\r
309\r
310class MockHandler:\r
311 # useful for testing handler machinery\r
312 # see add_ordered_mock_handlers() docstring\r
313 handler_order = 500\r
314 def __init__(self, methods):\r
315 self._define_methods(methods)\r
316 def _define_methods(self, methods):\r
317 for spec in methods:\r
318 if len(spec) == 2: name, action = spec\r
319 else: name, action = spec, None\r
320 meth = FakeMethod(name, action, self.handle)\r
321 setattr(self.__class__, name, meth)\r
322 def handle(self, fn_name, action, *args, **kwds):\r
323 self.parent.calls.append((self, fn_name, args, kwds))\r
324 if action is None:\r
325 return None\r
326 elif action == "return self":\r
327 return self\r
328 elif action == "return response":\r
329 res = MockResponse(200, "OK", {}, "")\r
330 return res\r
331 elif action == "return request":\r
332 return Request("http://blah/")\r
333 elif action.startswith("error"):\r
334 code = action[action.rfind(" ")+1:]\r
335 try:\r
336 code = int(code)\r
337 except ValueError:\r
338 pass\r
339 res = MockResponse(200, "OK", {}, "")\r
340 return self.parent.error("http", args[0], res, code, "", {})\r
341 elif action == "raise":\r
342 raise urllib2.URLError("blah")\r
343 assert False\r
344 def close(self): pass\r
345 def add_parent(self, parent):\r
346 self.parent = parent\r
347 self.parent.calls = []\r
348 def __lt__(self, other):\r
349 if not hasattr(other, "handler_order"):\r
350 # No handler_order, leave in original order. Yuck.\r
351 return True\r
352 return self.handler_order < other.handler_order\r
353\r
354def add_ordered_mock_handlers(opener, meth_spec):\r
355 """Create MockHandlers and add them to an OpenerDirector.\r
356\r
357 meth_spec: list of lists of tuples and strings defining methods to define\r
358 on handlers. eg:\r
359\r
360 [["http_error", "ftp_open"], ["http_open"]]\r
361\r
362 defines methods .http_error() and .ftp_open() on one handler, and\r
363 .http_open() on another. These methods just record their arguments and\r
364 return None. Using a tuple instead of a string causes the method to\r
365 perform some action (see MockHandler.handle()), eg:\r
366\r
367 [["http_error"], [("http_open", "return request")]]\r
368\r
369 defines .http_error() on one handler (which simply returns None), and\r
370 .http_open() on another handler, which returns a Request object.\r
371\r
372 """\r
373 handlers = []\r
374 count = 0\r
375 for meths in meth_spec:\r
376 class MockHandlerSubclass(MockHandler): pass\r
377 h = MockHandlerSubclass(meths)\r
378 h.handler_order += count\r
379 h.add_parent(opener)\r
380 count = count + 1\r
381 handlers.append(h)\r
382 opener.add_handler(h)\r
383 return handlers\r
384\r
385def build_test_opener(*handler_instances):\r
386 opener = OpenerDirector()\r
387 for h in handler_instances:\r
388 opener.add_handler(h)\r
389 return opener\r
390\r
391class MockHTTPHandler(urllib2.BaseHandler):\r
392 # useful for testing redirections and auth\r
393 # sends supplied headers and code as first response\r
394 # sends 200 OK as second response\r
395 def __init__(self, code, headers):\r
396 self.code = code\r
397 self.headers = headers\r
398 self.reset()\r
399 def reset(self):\r
400 self._count = 0\r
401 self.requests = []\r
402 def http_open(self, req):\r
403 import mimetools, httplib, copy\r
404 from StringIO import StringIO\r
405 self.requests.append(copy.deepcopy(req))\r
406 if self._count == 0:\r
407 self._count = self._count + 1\r
408 name = httplib.responses[self.code]\r
409 msg = mimetools.Message(StringIO(self.headers))\r
410 return self.parent.error(\r
411 "http", req, MockFile(), self.code, name, msg)\r
412 else:\r
413 self.req = req\r
414 msg = mimetools.Message(StringIO("\r\n\r\n"))\r
415 return MockResponse(200, "OK", msg, "", req.get_full_url())\r
416\r
417class MockHTTPSHandler(urllib2.AbstractHTTPHandler):\r
418 # Useful for testing the Proxy-Authorization request by verifying the\r
419 # properties of httpcon\r
420\r
421 def __init__(self):\r
422 urllib2.AbstractHTTPHandler.__init__(self)\r
423 self.httpconn = MockHTTPClass()\r
424\r
425 def https_open(self, req):\r
426 return self.do_open(self.httpconn, req)\r
427\r
428class MockPasswordManager:\r
429 def add_password(self, realm, uri, user, password):\r
430 self.realm = realm\r
431 self.url = uri\r
432 self.user = user\r
433 self.password = password\r
434 def find_user_password(self, realm, authuri):\r
435 self.target_realm = realm\r
436 self.target_url = authuri\r
437 return self.user, self.password\r
438\r
439\r
440class OpenerDirectorTests(unittest.TestCase):\r
441\r
442 def test_add_non_handler(self):\r
443 class NonHandler(object):\r
444 pass\r
445 self.assertRaises(TypeError,\r
446 OpenerDirector().add_handler, NonHandler())\r
447\r
448 def test_badly_named_methods(self):\r
449 # test work-around for three methods that accidentally follow the\r
450 # naming conventions for handler methods\r
451 # (*_open() / *_request() / *_response())\r
452\r
453 # These used to call the accidentally-named methods, causing a\r
454 # TypeError in real code; here, returning self from these mock\r
455 # methods would either cause no exception, or AttributeError.\r
456\r
457 from urllib2 import URLError\r
458\r
459 o = OpenerDirector()\r
460 meth_spec = [\r
461 [("do_open", "return self"), ("proxy_open", "return self")],\r
462 [("redirect_request", "return self")],\r
463 ]\r
464 handlers = add_ordered_mock_handlers(o, meth_spec)\r
465 o.add_handler(urllib2.UnknownHandler())\r
466 for scheme in "do", "proxy", "redirect":\r
467 self.assertRaises(URLError, o.open, scheme+"://example.com/")\r
468\r
469 def test_handled(self):\r
470 # handler returning non-None means no more handlers will be called\r
471 o = OpenerDirector()\r
472 meth_spec = [\r
473 ["http_open", "ftp_open", "http_error_302"],\r
474 ["ftp_open"],\r
475 [("http_open", "return self")],\r
476 [("http_open", "return self")],\r
477 ]\r
478 handlers = add_ordered_mock_handlers(o, meth_spec)\r
479\r
480 req = Request("http://example.com/")\r
481 r = o.open(req)\r
482 # Second .http_open() gets called, third doesn't, since second returned\r
483 # non-None. Handlers without .http_open() never get any methods called\r
484 # on them.\r
485 # In fact, second mock handler defining .http_open() returns self\r
486 # (instead of response), which becomes the OpenerDirector's return\r
487 # value.\r
488 self.assertEqual(r, handlers[2])\r
489 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]\r
490 for expected, got in zip(calls, o.calls):\r
491 handler, name, args, kwds = got\r
492 self.assertEqual((handler, name), expected)\r
493 self.assertEqual(args, (req,))\r
494\r
495 def test_handler_order(self):\r
496 o = OpenerDirector()\r
497 handlers = []\r
498 for meths, handler_order in [\r
499 ([("http_open", "return self")], 500),\r
500 (["http_open"], 0),\r
501 ]:\r
502 class MockHandlerSubclass(MockHandler): pass\r
503 h = MockHandlerSubclass(meths)\r
504 h.handler_order = handler_order\r
505 handlers.append(h)\r
506 o.add_handler(h)\r
507\r
508 r = o.open("http://example.com/")\r
509 # handlers called in reverse order, thanks to their sort order\r
510 self.assertEqual(o.calls[0][0], handlers[1])\r
511 self.assertEqual(o.calls[1][0], handlers[0])\r
512\r
513 def test_raise(self):\r
514 # raising URLError stops processing of request\r
515 o = OpenerDirector()\r
516 meth_spec = [\r
517 [("http_open", "raise")],\r
518 [("http_open", "return self")],\r
519 ]\r
520 handlers = add_ordered_mock_handlers(o, meth_spec)\r
521\r
522 req = Request("http://example.com/")\r
523 self.assertRaises(urllib2.URLError, o.open, req)\r
524 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])\r
525\r
526## def test_error(self):\r
527## # XXX this doesn't actually seem to be used in standard library,\r
528## # but should really be tested anyway...\r
529\r
530 def test_http_error(self):\r
531 # XXX http_error_default\r
532 # http errors are a special case\r
533 o = OpenerDirector()\r
534 meth_spec = [\r
535 [("http_open", "error 302")],\r
536 [("http_error_400", "raise"), "http_open"],\r
537 [("http_error_302", "return response"), "http_error_303",\r
538 "http_error"],\r
539 [("http_error_302")],\r
540 ]\r
541 handlers = add_ordered_mock_handlers(o, meth_spec)\r
542\r
543 class Unknown:\r
544 def __eq__(self, other): return True\r
545\r
546 req = Request("http://example.com/")\r
547 r = o.open(req)\r
548 assert len(o.calls) == 2\r
549 calls = [(handlers[0], "http_open", (req,)),\r
550 (handlers[2], "http_error_302",\r
551 (req, Unknown(), 302, "", {}))]\r
552 for expected, got in zip(calls, o.calls):\r
553 handler, method_name, args = expected\r
554 self.assertEqual((handler, method_name), got[:2])\r
555 self.assertEqual(args, got[2])\r
556\r
557 def test_processors(self):\r
558 # *_request / *_response methods get called appropriately\r
559 o = OpenerDirector()\r
560 meth_spec = [\r
561 [("http_request", "return request"),\r
562 ("http_response", "return response")],\r
563 [("http_request", "return request"),\r
564 ("http_response", "return response")],\r
565 ]\r
566 handlers = add_ordered_mock_handlers(o, meth_spec)\r
567\r
568 req = Request("http://example.com/")\r
569 r = o.open(req)\r
570 # processor methods are called on *all* handlers that define them,\r
571 # not just the first handler that handles the request\r
572 calls = [\r
573 (handlers[0], "http_request"), (handlers[1], "http_request"),\r
574 (handlers[0], "http_response"), (handlers[1], "http_response")]\r
575\r
576 for i, (handler, name, args, kwds) in enumerate(o.calls):\r
577 if i < 2:\r
578 # *_request\r
579 self.assertEqual((handler, name), calls[i])\r
580 self.assertEqual(len(args), 1)\r
581 self.assertIsInstance(args[0], Request)\r
582 else:\r
583 # *_response\r
584 self.assertEqual((handler, name), calls[i])\r
585 self.assertEqual(len(args), 2)\r
586 self.assertIsInstance(args[0], Request)\r
587 # response from opener.open is None, because there's no\r
588 # handler that defines http_open to handle it\r
589 self.assertTrue(args[1] is None or\r
590 isinstance(args[1], MockResponse))\r
591\r
592\r
593def sanepathname2url(path):\r
594 import urllib\r
595 urlpath = urllib.pathname2url(path)\r
596 if os.name == "nt" and urlpath.startswith("///"):\r
597 urlpath = urlpath[2:]\r
598 # XXX don't ask me about the mac...\r
599 return urlpath\r
600\r
601class HandlerTests(unittest.TestCase):\r
602\r
603 def test_ftp(self):\r
604 class MockFTPWrapper:\r
605 def __init__(self, data): self.data = data\r
606 def retrfile(self, filename, filetype):\r
607 self.filename, self.filetype = filename, filetype\r
608 return StringIO.StringIO(self.data), len(self.data)\r
609\r
610 class NullFTPHandler(urllib2.FTPHandler):\r
611 def __init__(self, data): self.data = data\r
612 def connect_ftp(self, user, passwd, host, port, dirs,\r
613 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r
614 self.user, self.passwd = user, passwd\r
615 self.host, self.port = host, port\r
616 self.dirs = dirs\r
617 self.ftpwrapper = MockFTPWrapper(self.data)\r
618 return self.ftpwrapper\r
619\r
620 import ftplib\r
621 data = "rheum rhaponicum"\r
622 h = NullFTPHandler(data)\r
623 o = h.parent = MockOpener()\r
624\r
625 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [\r
626 ("ftp://localhost/foo/bar/baz.html",\r
627 "localhost", ftplib.FTP_PORT, "", "", "I",\r
628 ["foo", "bar"], "baz.html", "text/html"),\r
629 ("ftp://parrot@localhost/foo/bar/baz.html",\r
630 "localhost", ftplib.FTP_PORT, "parrot", "", "I",\r
631 ["foo", "bar"], "baz.html", "text/html"),\r
632 ("ftp://%25parrot@localhost/foo/bar/baz.html",\r
633 "localhost", ftplib.FTP_PORT, "%parrot", "", "I",\r
634 ["foo", "bar"], "baz.html", "text/html"),\r
635 ("ftp://%2542parrot@localhost/foo/bar/baz.html",\r
636 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",\r
637 ["foo", "bar"], "baz.html", "text/html"),\r
638 ("ftp://localhost:80/foo/bar/",\r
639 "localhost", 80, "", "", "D",\r
640 ["foo", "bar"], "", None),\r
641 ("ftp://localhost/baz.gif;type=a",\r
642 "localhost", ftplib.FTP_PORT, "", "", "A",\r
643 [], "baz.gif", None), # XXX really this should guess image/gif\r
644 ]:\r
645 req = Request(url)\r
646 req.timeout = None\r
647 r = h.ftp_open(req)\r
648 # ftp authentication not yet implemented by FTPHandler\r
649 self.assertEqual(h.user, user)\r
650 self.assertEqual(h.passwd, passwd)\r
651 self.assertEqual(h.host, socket.gethostbyname(host))\r
652 self.assertEqual(h.port, port)\r
653 self.assertEqual(h.dirs, dirs)\r
654 self.assertEqual(h.ftpwrapper.filename, filename)\r
655 self.assertEqual(h.ftpwrapper.filetype, type_)\r
656 headers = r.info()\r
657 self.assertEqual(headers.get("Content-type"), mimetype)\r
658 self.assertEqual(int(headers["Content-length"]), len(data))\r
659\r
660 def test_file(self):\r
661 import rfc822, socket\r
662 h = urllib2.FileHandler()\r
663 o = h.parent = MockOpener()\r
664\r
665 TESTFN = test_support.TESTFN\r
666 urlpath = sanepathname2url(os.path.abspath(TESTFN))\r
667 towrite = "hello, world\n"\r
668 urls = [\r
669 "file://localhost%s" % urlpath,\r
670 "file://%s" % urlpath,\r
671 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),\r
672 ]\r
673 try:\r
674 localaddr = socket.gethostbyname(socket.gethostname())\r
675 except socket.gaierror:\r
676 localaddr = ''\r
677 if localaddr:\r
678 urls.append("file://%s%s" % (localaddr, urlpath))\r
679\r
680 for url in urls:\r
681 f = open(TESTFN, "wb")\r
682 try:\r
683 try:\r
684 f.write(towrite)\r
685 finally:\r
686 f.close()\r
687\r
688 r = h.file_open(Request(url))\r
689 try:\r
690 data = r.read()\r
691 headers = r.info()\r
692 respurl = r.geturl()\r
693 finally:\r
694 r.close()\r
695 stats = os.stat(TESTFN)\r
696 modified = rfc822.formatdate(stats.st_mtime)\r
697 finally:\r
698 os.remove(TESTFN)\r
699 self.assertEqual(data, towrite)\r
700 self.assertEqual(headers["Content-type"], "text/plain")\r
701 self.assertEqual(headers["Content-length"], "13")\r
702 self.assertEqual(headers["Last-modified"], modified)\r
703 self.assertEqual(respurl, url)\r
704\r
705 for url in [\r
706 "file://localhost:80%s" % urlpath,\r
707 "file:///file_does_not_exist.txt",\r
708 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),\r
709 os.getcwd(), TESTFN),\r
710 "file://somerandomhost.ontheinternet.com%s/%s" %\r
711 (os.getcwd(), TESTFN),\r
712 ]:\r
713 try:\r
714 f = open(TESTFN, "wb")\r
715 try:\r
716 f.write(towrite)\r
717 finally:\r
718 f.close()\r
719\r
720 self.assertRaises(urllib2.URLError,\r
721 h.file_open, Request(url))\r
722 finally:\r
723 os.remove(TESTFN)\r
724\r
725 h = urllib2.FileHandler()\r
726 o = h.parent = MockOpener()\r
727 # XXXX why does // mean ftp (and /// mean not ftp!), and where\r
728 # is file: scheme specified? I think this is really a bug, and\r
729 # what was intended was to distinguish between URLs like:\r
730 # file:/blah.txt (a file)\r
731 # file://localhost/blah.txt (a file)\r
732 # file:///blah.txt (a file)\r
733 # file://ftp.example.com/blah.txt (an ftp URL)\r
734 for url, ftp in [\r
735 ("file://ftp.example.com//foo.txt", True),\r
736 ("file://ftp.example.com///foo.txt", False),\r
737# XXXX bug: fails with OSError, should be URLError\r
738 ("file://ftp.example.com/foo.txt", False),\r
739 ("file://somehost//foo/something.txt", True),\r
740 ("file://localhost//foo/something.txt", False),\r
741 ]:\r
742 req = Request(url)\r
743 try:\r
744 h.file_open(req)\r
745 # XXXX remove OSError when bug fixed\r
746 except (urllib2.URLError, OSError):\r
747 self.assertTrue(not ftp)\r
748 else:\r
749 self.assertTrue(o.req is req)\r
750 self.assertEqual(req.type, "ftp")\r
751 self.assertEqual(req.type == "ftp", ftp)\r
752\r
753 def test_http(self):\r
754\r
755 h = urllib2.AbstractHTTPHandler()\r
756 o = h.parent = MockOpener()\r
757\r
758 url = "http://example.com/"\r
759 for method, data in [("GET", None), ("POST", "blah")]:\r
760 req = Request(url, data, {"Foo": "bar"})\r
761 req.timeout = None\r
762 req.add_unredirected_header("Spam", "eggs")\r
763 http = MockHTTPClass()\r
764 r = h.do_open(http, req)\r
765\r
766 # result attributes\r
767 r.read; r.readline # wrapped MockFile methods\r
768 r.info; r.geturl # addinfourl methods\r
769 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply()\r
770 hdrs = r.info()\r
771 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply()\r
772 self.assertEqual(r.geturl(), url)\r
773\r
774 self.assertEqual(http.host, "example.com")\r
775 self.assertEqual(http.level, 0)\r
776 self.assertEqual(http.method, method)\r
777 self.assertEqual(http.selector, "/")\r
778 self.assertEqual(http.req_headers,\r
779 [("Connection", "close"),\r
780 ("Foo", "bar"), ("Spam", "eggs")])\r
781 self.assertEqual(http.data, data)\r
782\r
783 # check socket.error converted to URLError\r
784 http.raise_on_endheaders = True\r
785 self.assertRaises(urllib2.URLError, h.do_open, http, req)\r
786\r
787 # check adding of standard headers\r
788 o.addheaders = [("Spam", "eggs")]\r
789 for data in "", None: # POST, GET\r
790 req = Request("http://example.com/", data)\r
791 r = MockResponse(200, "OK", {}, "")\r
792 newreq = h.do_request_(req)\r
793 if data is None: # GET\r
794 self.assertNotIn("Content-length", req.unredirected_hdrs)\r
795 self.assertNotIn("Content-type", req.unredirected_hdrs)\r
796 else: # POST\r
797 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")\r
798 self.assertEqual(req.unredirected_hdrs["Content-type"],\r
799 "application/x-www-form-urlencoded")\r
800 # XXX the details of Host could be better tested\r
801 self.assertEqual(req.unredirected_hdrs["Host"], "example.com")\r
802 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")\r
803\r
804 # don't clobber existing headers\r
805 req.add_unredirected_header("Content-length", "foo")\r
806 req.add_unredirected_header("Content-type", "bar")\r
807 req.add_unredirected_header("Host", "baz")\r
808 req.add_unredirected_header("Spam", "foo")\r
809 newreq = h.do_request_(req)\r
810 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")\r
811 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")\r
812 self.assertEqual(req.unredirected_hdrs["Host"], "baz")\r
813 self.assertEqual(req.unredirected_hdrs["Spam"], "foo")\r
814\r
815 def test_http_doubleslash(self):\r
816 # Checks that the presence of an unnecessary double slash in a url doesn't break anything\r
817 # Previously, a double slash directly after the host could cause incorrect parsing of the url\r
818 h = urllib2.AbstractHTTPHandler()\r
819 o = h.parent = MockOpener()\r
820\r
821 data = ""\r
822 ds_urls = [\r
823 "http://example.com/foo/bar/baz.html",\r
824 "http://example.com//foo/bar/baz.html",\r
825 "http://example.com/foo//bar/baz.html",\r
826 "http://example.com/foo/bar//baz.html",\r
827 ]\r
828\r
829 for ds_url in ds_urls:\r
830 ds_req = Request(ds_url, data)\r
831\r
832 # Check whether host is determined correctly if there is no proxy\r
833 np_ds_req = h.do_request_(ds_req)\r
834 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")\r
835\r
836 # Check whether host is determined correctly if there is a proxy\r
837 ds_req.set_proxy("someproxy:3128",None)\r
838 p_ds_req = h.do_request_(ds_req)\r
839 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")\r
840\r
841 def test_fixpath_in_weirdurls(self):\r
842 # Issue4493: urllib2 to supply '/' when to urls where path does not\r
843 # start with'/'\r
844\r
845 h = urllib2.AbstractHTTPHandler()\r
846 o = h.parent = MockOpener()\r
847\r
848 weird_url = 'http://www.python.org?getspam'\r
849 req = Request(weird_url)\r
850 newreq = h.do_request_(req)\r
851 self.assertEqual(newreq.get_host(),'www.python.org')\r
852 self.assertEqual(newreq.get_selector(),'/?getspam')\r
853\r
854 url_without_path = 'http://www.python.org'\r
855 req = Request(url_without_path)\r
856 newreq = h.do_request_(req)\r
857 self.assertEqual(newreq.get_host(),'www.python.org')\r
858 self.assertEqual(newreq.get_selector(),'')\r
859\r
860 def test_errors(self):\r
861 h = urllib2.HTTPErrorProcessor()\r
862 o = h.parent = MockOpener()\r
863\r
864 url = "http://example.com/"\r
865 req = Request(url)\r
866 # all 2xx are passed through\r
867 r = MockResponse(200, "OK", {}, "", url)\r
868 newr = h.http_response(req, r)\r
869 self.assertTrue(r is newr)\r
870 self.assertTrue(not hasattr(o, "proto")) # o.error not called\r
871 r = MockResponse(202, "Accepted", {}, "", url)\r
872 newr = h.http_response(req, r)\r
873 self.assertTrue(r is newr)\r
874 self.assertTrue(not hasattr(o, "proto")) # o.error not called\r
875 r = MockResponse(206, "Partial content", {}, "", url)\r
876 newr = h.http_response(req, r)\r
877 self.assertTrue(r is newr)\r
878 self.assertTrue(not hasattr(o, "proto")) # o.error not called\r
879 # anything else calls o.error (and MockOpener returns None, here)\r
880 r = MockResponse(502, "Bad gateway", {}, "", url)\r
881 self.assertTrue(h.http_response(req, r) is None)\r
882 self.assertEqual(o.proto, "http") # o.error called\r
883 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))\r
884\r
885 def test_cookies(self):\r
886 cj = MockCookieJar()\r
887 h = urllib2.HTTPCookieProcessor(cj)\r
888 o = h.parent = MockOpener()\r
889\r
890 req = Request("http://example.com/")\r
891 r = MockResponse(200, "OK", {}, "")\r
892 newreq = h.http_request(req)\r
893 self.assertTrue(cj.ach_req is req is newreq)\r
894 self.assertEqual(req.get_origin_req_host(), "example.com")\r
895 self.assertTrue(not req.is_unverifiable())\r
896 newr = h.http_response(req, r)\r
897 self.assertTrue(cj.ec_req is req)\r
898 self.assertTrue(cj.ec_r is r is newr)\r
899\r
900 def test_redirect(self):\r
901 from_url = "http://example.com/a.html"\r
902 to_url = "http://example.com/b.html"\r
903 h = urllib2.HTTPRedirectHandler()\r
904 o = h.parent = MockOpener()\r
905\r
906 # ordinary redirect behaviour\r
907 for code in 301, 302, 303, 307:\r
908 for data in None, "blah\nblah\n":\r
909 method = getattr(h, "http_error_%s" % code)\r
910 req = Request(from_url, data)\r
911 req.add_header("Nonsense", "viking=withhold")\r
912 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r
913 if data is not None:\r
914 req.add_header("Content-Length", str(len(data)))\r
915 req.add_unredirected_header("Spam", "spam")\r
916 try:\r
917 method(req, MockFile(), code, "Blah",\r
918 MockHeaders({"location": to_url}))\r
919 except urllib2.HTTPError:\r
920 # 307 in response to POST requires user OK\r
921 self.assertTrue(code == 307 and data is not None)\r
922 self.assertEqual(o.req.get_full_url(), to_url)\r
923 try:\r
924 self.assertEqual(o.req.get_method(), "GET")\r
925 except AttributeError:\r
926 self.assertTrue(not o.req.has_data())\r
927\r
928 # now it's a GET, there should not be headers regarding content\r
929 # (possibly dragged from before being a POST)\r
930 headers = [x.lower() for x in o.req.headers]\r
931 self.assertNotIn("content-length", headers)\r
932 self.assertNotIn("content-type", headers)\r
933\r
934 self.assertEqual(o.req.headers["Nonsense"],\r
935 "viking=withhold")\r
936 self.assertNotIn("Spam", o.req.headers)\r
937 self.assertNotIn("Spam", o.req.unredirected_hdrs)\r
938\r
939 # loop detection\r
940 req = Request(from_url)\r
941 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r
942 def redirect(h, req, url=to_url):\r
943 h.http_error_302(req, MockFile(), 302, "Blah",\r
944 MockHeaders({"location": url}))\r
945 # Note that the *original* request shares the same record of\r
946 # redirections with the sub-requests caused by the redirections.\r
947\r
948 # detect infinite loop redirect of a URL to itself\r
949 req = Request(from_url, origin_req_host="example.com")\r
950 count = 0\r
951 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r
952 try:\r
953 while 1:\r
954 redirect(h, req, "http://example.com/")\r
955 count = count + 1\r
956 except urllib2.HTTPError:\r
957 # don't stop until max_repeats, because cookies may introduce state\r
958 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)\r
959\r
960 # detect endless non-repeating chain of redirects\r
961 req = Request(from_url, origin_req_host="example.com")\r
962 count = 0\r
963 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r
964 try:\r
965 while 1:\r
966 redirect(h, req, "http://example.com/%d" % count)\r
967 count = count + 1\r
968 except urllib2.HTTPError:\r
969 self.assertEqual(count,\r
970 urllib2.HTTPRedirectHandler.max_redirections)\r
971\r
972 def test_invalid_redirect(self):\r
973 from_url = "http://example.com/a.html"\r
974 valid_schemes = ['http', 'https', 'ftp']\r
975 invalid_schemes = ['file', 'imap', 'ldap']\r
976 schemeless_url = "example.com/b.html"\r
977 h = urllib2.HTTPRedirectHandler()\r
978 o = h.parent = MockOpener()\r
979 req = Request(from_url)\r
980 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r
981\r
982 for scheme in invalid_schemes:\r
983 invalid_url = scheme + '://' + schemeless_url\r
984 self.assertRaises(urllib2.HTTPError, h.http_error_302,\r
985 req, MockFile(), 302, "Security Loophole",\r
986 MockHeaders({"location": invalid_url}))\r
987\r
988 for scheme in valid_schemes:\r
989 valid_url = scheme + '://' + schemeless_url\r
990 h.http_error_302(req, MockFile(), 302, "That's fine",\r
991 MockHeaders({"location": valid_url}))\r
992 self.assertEqual(o.req.get_full_url(), valid_url)\r
993\r
994 def test_cookie_redirect(self):\r
995 # cookies shouldn't leak into redirected requests\r
996 from cookielib import CookieJar\r
997\r
998 from test.test_cookielib import interact_netscape\r
999\r
1000 cj = CookieJar()\r
1001 interact_netscape(cj, "http://www.example.com/", "spam=eggs")\r
1002 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")\r
1003 hdeh = urllib2.HTTPDefaultErrorHandler()\r
1004 hrh = urllib2.HTTPRedirectHandler()\r
1005 cp = urllib2.HTTPCookieProcessor(cj)\r
1006 o = build_test_opener(hh, hdeh, hrh, cp)\r
1007 o.open("http://www.example.com/")\r
1008 self.assertTrue(not hh.req.has_header("Cookie"))\r
1009\r
1010 def test_redirect_fragment(self):\r
1011 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'\r
1012 hh = MockHTTPHandler(302, 'Location: ' + redirected_url)\r
1013 hdeh = urllib2.HTTPDefaultErrorHandler()\r
1014 hrh = urllib2.HTTPRedirectHandler()\r
1015 o = build_test_opener(hh, hdeh, hrh)\r
1016 fp = o.open('http://www.example.com')\r
1017 self.assertEqual(fp.geturl(), redirected_url.strip())\r
1018\r
1019 def test_proxy(self):\r
1020 o = OpenerDirector()\r
1021 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))\r
1022 o.add_handler(ph)\r
1023 meth_spec = [\r
1024 [("http_open", "return response")]\r
1025 ]\r
1026 handlers = add_ordered_mock_handlers(o, meth_spec)\r
1027\r
1028 req = Request("http://acme.example.com/")\r
1029 self.assertEqual(req.get_host(), "acme.example.com")\r
1030 r = o.open(req)\r
1031 self.assertEqual(req.get_host(), "proxy.example.com:3128")\r
1032\r
1033 self.assertEqual([(handlers[0], "http_open")],\r
1034 [tup[0:2] for tup in o.calls])\r
1035\r
1036 def test_proxy_no_proxy(self):\r
1037 os.environ['no_proxy'] = 'python.org'\r
1038 o = OpenerDirector()\r
1039 ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))\r
1040 o.add_handler(ph)\r
1041 req = Request("http://www.perl.org/")\r
1042 self.assertEqual(req.get_host(), "www.perl.org")\r
1043 r = o.open(req)\r
1044 self.assertEqual(req.get_host(), "proxy.example.com")\r
1045 req = Request("http://www.python.org")\r
1046 self.assertEqual(req.get_host(), "www.python.org")\r
1047 r = o.open(req)\r
1048 self.assertEqual(req.get_host(), "www.python.org")\r
1049 del os.environ['no_proxy']\r
1050\r
1051\r
1052 def test_proxy_https(self):\r
1053 o = OpenerDirector()\r
1054 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))\r
1055 o.add_handler(ph)\r
1056 meth_spec = [\r
1057 [("https_open","return response")]\r
1058 ]\r
1059 handlers = add_ordered_mock_handlers(o, meth_spec)\r
1060 req = Request("https://www.example.com/")\r
1061 self.assertEqual(req.get_host(), "www.example.com")\r
1062 r = o.open(req)\r
1063 self.assertEqual(req.get_host(), "proxy.example.com:3128")\r
1064 self.assertEqual([(handlers[0], "https_open")],\r
1065 [tup[0:2] for tup in o.calls])\r
1066\r
1067 def test_proxy_https_proxy_authorization(self):\r
1068 o = OpenerDirector()\r
1069 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))\r
1070 o.add_handler(ph)\r
1071 https_handler = MockHTTPSHandler()\r
1072 o.add_handler(https_handler)\r
1073 req = Request("https://www.example.com/")\r
1074 req.add_header("Proxy-Authorization","FooBar")\r
1075 req.add_header("User-Agent","Grail")\r
1076 self.assertEqual(req.get_host(), "www.example.com")\r
1077 self.assertIsNone(req._tunnel_host)\r
1078 r = o.open(req)\r
1079 # Verify Proxy-Authorization gets tunneled to request.\r
1080 # httpsconn req_headers do not have the Proxy-Authorization header but\r
1081 # the req will have.\r
1082 self.assertNotIn(("Proxy-Authorization","FooBar"),\r
1083 https_handler.httpconn.req_headers)\r
1084 self.assertIn(("User-Agent","Grail"),\r
1085 https_handler.httpconn.req_headers)\r
1086 self.assertIsNotNone(req._tunnel_host)\r
1087 self.assertEqual(req.get_host(), "proxy.example.com:3128")\r
1088 self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")\r
1089\r
1090 def test_basic_auth(self, quote_char='"'):\r
1091 opener = OpenerDirector()\r
1092 password_manager = MockPasswordManager()\r
1093 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)\r
1094 realm = "ACME Widget Store"\r
1095 http_handler = MockHTTPHandler(\r
1096 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %\r
1097 (quote_char, realm, quote_char) )\r
1098 opener.add_handler(auth_handler)\r
1099 opener.add_handler(http_handler)\r
1100 self._test_basic_auth(opener, auth_handler, "Authorization",\r
1101 realm, http_handler, password_manager,\r
1102 "http://acme.example.com/protected",\r
1103 "http://acme.example.com/protected",\r
1104 )\r
1105\r
1106 def test_basic_auth_with_single_quoted_realm(self):\r
1107 self.test_basic_auth(quote_char="'")\r
1108\r
1109 def test_proxy_basic_auth(self):\r
1110 opener = OpenerDirector()\r
1111 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))\r
1112 opener.add_handler(ph)\r
1113 password_manager = MockPasswordManager()\r
1114 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)\r
1115 realm = "ACME Networks"\r
1116 http_handler = MockHTTPHandler(\r
1117 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)\r
1118 opener.add_handler(auth_handler)\r
1119 opener.add_handler(http_handler)\r
1120 self._test_basic_auth(opener, auth_handler, "Proxy-authorization",\r
1121 realm, http_handler, password_manager,\r
1122 "http://acme.example.com:3128/protected",\r
1123 "proxy.example.com:3128",\r
1124 )\r
1125\r
1126 def test_basic_and_digest_auth_handlers(self):\r
1127 # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*\r
1128 # response (http://python.org/sf/1479302), where it should instead\r
1129 # return None to allow another handler (especially\r
1130 # HTTPBasicAuthHandler) to handle the response.\r
1131\r
1132 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must\r
1133 # try digest first (since it's the strongest auth scheme), so we record\r
1134 # order of calls here to check digest comes first:\r
1135 class RecordingOpenerDirector(OpenerDirector):\r
1136 def __init__(self):\r
1137 OpenerDirector.__init__(self)\r
1138 self.recorded = []\r
1139 def record(self, info):\r
1140 self.recorded.append(info)\r
1141 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):\r
1142 def http_error_401(self, *args, **kwds):\r
1143 self.parent.record("digest")\r
1144 urllib2.HTTPDigestAuthHandler.http_error_401(self,\r
1145 *args, **kwds)\r
1146 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):\r
1147 def http_error_401(self, *args, **kwds):\r
1148 self.parent.record("basic")\r
1149 urllib2.HTTPBasicAuthHandler.http_error_401(self,\r
1150 *args, **kwds)\r
1151\r
1152 opener = RecordingOpenerDirector()\r
1153 password_manager = MockPasswordManager()\r
1154 digest_handler = TestDigestAuthHandler(password_manager)\r
1155 basic_handler = TestBasicAuthHandler(password_manager)\r
1156 realm = "ACME Networks"\r
1157 http_handler = MockHTTPHandler(\r
1158 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)\r
1159 opener.add_handler(basic_handler)\r
1160 opener.add_handler(digest_handler)\r
1161 opener.add_handler(http_handler)\r
1162\r
1163 # check basic auth isn't blocked by digest handler failing\r
1164 self._test_basic_auth(opener, basic_handler, "Authorization",\r
1165 realm, http_handler, password_manager,\r
1166 "http://acme.example.com/protected",\r
1167 "http://acme.example.com/protected",\r
1168 )\r
1169 # check digest was tried before basic (twice, because\r
1170 # _test_basic_auth called .open() twice)\r
1171 self.assertEqual(opener.recorded, ["digest", "basic"]*2)\r
1172\r
1173 def _test_basic_auth(self, opener, auth_handler, auth_header,\r
1174 realm, http_handler, password_manager,\r
1175 request_url, protected_url):\r
1176 import base64\r
1177 user, password = "wile", "coyote"\r
1178\r
1179 # .add_password() fed through to password manager\r
1180 auth_handler.add_password(realm, request_url, user, password)\r
1181 self.assertEqual(realm, password_manager.realm)\r
1182 self.assertEqual(request_url, password_manager.url)\r
1183 self.assertEqual(user, password_manager.user)\r
1184 self.assertEqual(password, password_manager.password)\r
1185\r
1186 r = opener.open(request_url)\r
1187\r
1188 # should have asked the password manager for the username/password\r
1189 self.assertEqual(password_manager.target_realm, realm)\r
1190 self.assertEqual(password_manager.target_url, protected_url)\r
1191\r
1192 # expect one request without authorization, then one with\r
1193 self.assertEqual(len(http_handler.requests), 2)\r
1194 self.assertFalse(http_handler.requests[0].has_header(auth_header))\r
1195 userpass = '%s:%s' % (user, password)\r
1196 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()\r
1197 self.assertEqual(http_handler.requests[1].get_header(auth_header),\r
1198 auth_hdr_value)\r
1199 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],\r
1200 auth_hdr_value)\r
1201 # if the password manager can't find a password, the handler won't\r
1202 # handle the HTTP auth error\r
1203 password_manager.user = password_manager.password = None\r
1204 http_handler.reset()\r
1205 r = opener.open(request_url)\r
1206 self.assertEqual(len(http_handler.requests), 1)\r
1207 self.assertFalse(http_handler.requests[0].has_header(auth_header))\r
1208\r
1209class MiscTests(unittest.TestCase):\r
1210\r
1211 def test_build_opener(self):\r
1212 class MyHTTPHandler(urllib2.HTTPHandler): pass\r
1213 class FooHandler(urllib2.BaseHandler):\r
1214 def foo_open(self): pass\r
1215 class BarHandler(urllib2.BaseHandler):\r
1216 def bar_open(self): pass\r
1217\r
1218 build_opener = urllib2.build_opener\r
1219\r
1220 o = build_opener(FooHandler, BarHandler)\r
1221 self.opener_has_handler(o, FooHandler)\r
1222 self.opener_has_handler(o, BarHandler)\r
1223\r
1224 # can take a mix of classes and instances\r
1225 o = build_opener(FooHandler, BarHandler())\r
1226 self.opener_has_handler(o, FooHandler)\r
1227 self.opener_has_handler(o, BarHandler)\r
1228\r
1229 # subclasses of default handlers override default handlers\r
1230 o = build_opener(MyHTTPHandler)\r
1231 self.opener_has_handler(o, MyHTTPHandler)\r
1232\r
1233 # a particular case of overriding: default handlers can be passed\r
1234 # in explicitly\r
1235 o = build_opener()\r
1236 self.opener_has_handler(o, urllib2.HTTPHandler)\r
1237 o = build_opener(urllib2.HTTPHandler)\r
1238 self.opener_has_handler(o, urllib2.HTTPHandler)\r
1239 o = build_opener(urllib2.HTTPHandler())\r
1240 self.opener_has_handler(o, urllib2.HTTPHandler)\r
1241\r
1242 # Issue2670: multiple handlers sharing the same base class\r
1243 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass\r
1244 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)\r
1245 self.opener_has_handler(o, MyHTTPHandler)\r
1246 self.opener_has_handler(o, MyOtherHTTPHandler)\r
1247\r
1248 def opener_has_handler(self, opener, handler_class):\r
1249 for h in opener.handlers:\r
1250 if h.__class__ == handler_class:\r
1251 break\r
1252 else:\r
1253 self.assertTrue(False)\r
1254\r
1255class RequestTests(unittest.TestCase):\r
1256\r
1257 def setUp(self):\r
1258 self.get = urllib2.Request("http://www.python.org/~jeremy/")\r
1259 self.post = urllib2.Request("http://www.python.org/~jeremy/",\r
1260 "data",\r
1261 headers={"X-Test": "test"})\r
1262\r
1263 def test_method(self):\r
1264 self.assertEqual("POST", self.post.get_method())\r
1265 self.assertEqual("GET", self.get.get_method())\r
1266\r
1267 def test_add_data(self):\r
1268 self.assertTrue(not self.get.has_data())\r
1269 self.assertEqual("GET", self.get.get_method())\r
1270 self.get.add_data("spam")\r
1271 self.assertTrue(self.get.has_data())\r
1272 self.assertEqual("POST", self.get.get_method())\r
1273\r
1274 def test_get_full_url(self):\r
1275 self.assertEqual("http://www.python.org/~jeremy/",\r
1276 self.get.get_full_url())\r
1277\r
1278 def test_selector(self):\r
1279 self.assertEqual("/~jeremy/", self.get.get_selector())\r
1280 req = urllib2.Request("http://www.python.org/")\r
1281 self.assertEqual("/", req.get_selector())\r
1282\r
1283 def test_get_type(self):\r
1284 self.assertEqual("http", self.get.get_type())\r
1285\r
1286 def test_get_host(self):\r
1287 self.assertEqual("www.python.org", self.get.get_host())\r
1288\r
1289 def test_get_host_unquote(self):\r
1290 req = urllib2.Request("http://www.%70ython.org/")\r
1291 self.assertEqual("www.python.org", req.get_host())\r
1292\r
1293 def test_proxy(self):\r
1294 self.assertTrue(not self.get.has_proxy())\r
1295 self.get.set_proxy("www.perl.org", "http")\r
1296 self.assertTrue(self.get.has_proxy())\r
1297 self.assertEqual("www.python.org", self.get.get_origin_req_host())\r
1298 self.assertEqual("www.perl.org", self.get.get_host())\r
1299\r
1300 def test_wrapped_url(self):\r
1301 req = Request("<URL:http://www.python.org>")\r
1302 self.assertEqual("www.python.org", req.get_host())\r
1303\r
1304 def test_url_fragment(self):\r
1305 req = Request("http://www.python.org/?qs=query#fragment=true")\r
1306 self.assertEqual("/?qs=query", req.get_selector())\r
1307 req = Request("http://www.python.org/#fun=true")\r
1308 self.assertEqual("/", req.get_selector())\r
1309\r
1310 # Issue 11703: geturl() omits fragment in the original URL.\r
1311 url = 'http://docs.python.org/library/urllib2.html#OK'\r
1312 req = Request(url)\r
1313 self.assertEqual(req.get_full_url(), url)\r
1314\r
1315def test_main(verbose=None):\r
1316 from test import test_urllib2\r
1317 test_support.run_doctest(test_urllib2, verbose)\r
1318 test_support.run_doctest(urllib2, verbose)\r
1319 tests = (TrivialTests,\r
1320 OpenerDirectorTests,\r
1321 HandlerTests,\r
1322 MiscTests,\r
1323 RequestTests)\r
1324 test_support.run_unittest(*tests)\r
1325\r
1326if __name__ == "__main__":\r
1327 test_main(verbose=True)\r