]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | import unittest\r |
2 | from test import test_support\r | |
3 | \r | |
4 | import os\r | |
5 | import socket\r | |
6 | import StringIO\r | |
7 | \r | |
8 | import urllib2\r | |
9 | from urllib2 import Request, OpenerDirector\r | |
10 | \r | |
11 | # XXX\r | |
12 | # Request\r | |
13 | # CacheFTPHandler (hard to write)\r | |
14 | # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler\r | |
15 | \r | |
16 | class TrivialTests(unittest.TestCase):\r | |
17 | def test_trivial(self):\r | |
18 | # A couple trivial tests\r | |
19 | \r | |
20 | self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')\r | |
21 | \r | |
22 | # XXX Name hacking to get this to work on Windows.\r | |
23 | fname = os.path.abspath(urllib2.__file__).replace('\\', '/')\r | |
24 | \r | |
25 | # And more hacking to get it to work on MacOS. This assumes\r | |
26 | # urllib.pathname2url works, unfortunately...\r | |
27 | if os.name == 'riscos':\r | |
28 | import string\r | |
29 | fname = os.expand(fname)\r | |
30 | fname = fname.translate(string.maketrans("/.", "./"))\r | |
31 | \r | |
32 | if os.name == 'nt':\r | |
33 | file_url = "file:///%s" % fname\r | |
34 | else:\r | |
35 | file_url = "file://%s" % fname\r | |
36 | \r | |
37 | f = urllib2.urlopen(file_url)\r | |
38 | \r | |
39 | buf = f.read()\r | |
40 | f.close()\r | |
41 | \r | |
42 | def test_parse_http_list(self):\r | |
43 | tests = [('a,b,c', ['a', 'b', 'c']),\r | |
44 | ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),\r | |
45 | ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),\r | |
46 | ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]\r | |
47 | for string, list in tests:\r | |
48 | self.assertEqual(urllib2.parse_http_list(string), list)\r | |
49 | \r | |
50 | \r | |
51 | def test_request_headers_dict():\r | |
52 | """\r | |
53 | The Request.headers dictionary is not a documented interface. It should\r | |
54 | stay that way, because the complete set of headers are only accessible\r | |
55 | through the .get_header(), .has_header(), .header_items() interface.\r | |
56 | However, .headers pre-dates those methods, and so real code will be using\r | |
57 | the dictionary.\r | |
58 | \r | |
59 | The introduction in 2.4 of those methods was a mistake for the same reason:\r | |
60 | code that previously saw all (urllib2 user)-provided headers in .headers\r | |
61 | now sees only a subset (and the function interface is ugly and incomplete).\r | |
62 | A better change would have been to replace .headers dict with a dict\r | |
63 | subclass (or UserDict.DictMixin instance?) that preserved the .headers\r | |
64 | interface and also provided access to the "unredirected" headers. It's\r | |
65 | probably too late to fix that, though.\r | |
66 | \r | |
67 | \r | |
68 | Check .capitalize() case normalization:\r | |
69 | \r | |
70 | >>> url = "http://example.com"\r | |
71 | >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]\r | |
72 | 'blah'\r | |
73 | >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]\r | |
74 | 'blah'\r | |
75 | \r | |
76 | Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,\r | |
77 | but that could be changed in future.\r | |
78 | \r | |
79 | """\r | |
80 | \r | |
81 | def test_request_headers_methods():\r | |
82 | """\r | |
83 | Note the case normalization of header names here, to .capitalize()-case.\r | |
84 | This should be preserved for backwards-compatibility. (In the HTTP case,\r | |
85 | normalization to .title()-case is done by urllib2 before sending headers to\r | |
86 | httplib).\r | |
87 | \r | |
88 | >>> url = "http://example.com"\r | |
89 | >>> r = Request(url, headers={"Spam-eggs": "blah"})\r | |
90 | >>> r.has_header("Spam-eggs")\r | |
91 | True\r | |
92 | >>> r.header_items()\r | |
93 | [('Spam-eggs', 'blah')]\r | |
94 | >>> r.add_header("Foo-Bar", "baz")\r | |
95 | >>> items = r.header_items()\r | |
96 | >>> items.sort()\r | |
97 | >>> items\r | |
98 | [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]\r | |
99 | \r | |
100 | Note that e.g. r.has_header("spam-EggS") is currently False, and\r | |
101 | r.get_header("spam-EggS") returns None, but that could be changed in\r | |
102 | future.\r | |
103 | \r | |
104 | >>> r.has_header("Not-there")\r | |
105 | False\r | |
106 | >>> print r.get_header("Not-there")\r | |
107 | None\r | |
108 | >>> r.get_header("Not-there", "default")\r | |
109 | 'default'\r | |
110 | \r | |
111 | """\r | |
112 | \r | |
113 | \r | |
114 | def test_password_manager(self):\r | |
115 | """\r | |
116 | >>> mgr = urllib2.HTTPPasswordMgr()\r | |
117 | >>> add = mgr.add_password\r | |
118 | >>> add("Some Realm", "http://example.com/", "joe", "password")\r | |
119 | >>> add("Some Realm", "http://example.com/ni", "ni", "ni")\r | |
120 | >>> add("c", "http://example.com/foo", "foo", "ni")\r | |
121 | >>> add("c", "http://example.com/bar", "bar", "nini")\r | |
122 | >>> add("b", "http://example.com/", "first", "blah")\r | |
123 | >>> add("b", "http://example.com/", "second", "spam")\r | |
124 | >>> add("a", "http://example.com", "1", "a")\r | |
125 | >>> add("Some Realm", "http://c.example.com:3128", "3", "c")\r | |
126 | >>> add("Some Realm", "d.example.com", "4", "d")\r | |
127 | >>> add("Some Realm", "e.example.com:3128", "5", "e")\r | |
128 | \r | |
129 | >>> mgr.find_user_password("Some Realm", "example.com")\r | |
130 | ('joe', 'password')\r | |
131 | >>> mgr.find_user_password("Some Realm", "http://example.com")\r | |
132 | ('joe', 'password')\r | |
133 | >>> mgr.find_user_password("Some Realm", "http://example.com/")\r | |
134 | ('joe', 'password')\r | |
135 | >>> mgr.find_user_password("Some Realm", "http://example.com/spam")\r | |
136 | ('joe', 'password')\r | |
137 | >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")\r | |
138 | ('joe', 'password')\r | |
139 | >>> mgr.find_user_password("c", "http://example.com/foo")\r | |
140 | ('foo', 'ni')\r | |
141 | >>> mgr.find_user_password("c", "http://example.com/bar")\r | |
142 | ('bar', 'nini')\r | |
143 | \r | |
144 | Actually, this is really undefined ATM\r | |
145 | ## Currently, we use the highest-level path where more than one match:\r | |
146 | \r | |
147 | ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni")\r | |
148 | ## ('joe', 'password')\r | |
149 | \r | |
150 | Use latest add_password() in case of conflict:\r | |
151 | \r | |
152 | >>> mgr.find_user_password("b", "http://example.com/")\r | |
153 | ('second', 'spam')\r | |
154 | \r | |
155 | No special relationship between a.example.com and example.com:\r | |
156 | \r | |
157 | >>> mgr.find_user_password("a", "http://example.com/")\r | |
158 | ('1', 'a')\r | |
159 | >>> mgr.find_user_password("a", "http://a.example.com/")\r | |
160 | (None, None)\r | |
161 | \r | |
162 | Ports:\r | |
163 | \r | |
164 | >>> mgr.find_user_password("Some Realm", "c.example.com")\r | |
165 | (None, None)\r | |
166 | >>> mgr.find_user_password("Some Realm", "c.example.com:3128")\r | |
167 | ('3', 'c')\r | |
168 | >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")\r | |
169 | ('3', 'c')\r | |
170 | >>> mgr.find_user_password("Some Realm", "d.example.com")\r | |
171 | ('4', 'd')\r | |
172 | >>> mgr.find_user_password("Some Realm", "e.example.com:3128")\r | |
173 | ('5', 'e')\r | |
174 | \r | |
175 | """\r | |
176 | pass\r | |
177 | \r | |
178 | \r | |
179 | def test_password_manager_default_port(self):\r | |
180 | """\r | |
181 | >>> mgr = urllib2.HTTPPasswordMgr()\r | |
182 | >>> add = mgr.add_password\r | |
183 | \r | |
184 | The point to note here is that we can't guess the default port if there's\r | |
185 | no scheme. This applies to both add_password and find_user_password.\r | |
186 | \r | |
187 | >>> add("f", "http://g.example.com:80", "10", "j")\r | |
188 | >>> add("g", "http://h.example.com", "11", "k")\r | |
189 | >>> add("h", "i.example.com:80", "12", "l")\r | |
190 | >>> add("i", "j.example.com", "13", "m")\r | |
191 | >>> mgr.find_user_password("f", "g.example.com:100")\r | |
192 | (None, None)\r | |
193 | >>> mgr.find_user_password("f", "g.example.com:80")\r | |
194 | ('10', 'j')\r | |
195 | >>> mgr.find_user_password("f", "g.example.com")\r | |
196 | (None, None)\r | |
197 | >>> mgr.find_user_password("f", "http://g.example.com:100")\r | |
198 | (None, None)\r | |
199 | >>> mgr.find_user_password("f", "http://g.example.com:80")\r | |
200 | ('10', 'j')\r | |
201 | >>> mgr.find_user_password("f", "http://g.example.com")\r | |
202 | ('10', 'j')\r | |
203 | >>> mgr.find_user_password("g", "h.example.com")\r | |
204 | ('11', 'k')\r | |
205 | >>> mgr.find_user_password("g", "h.example.com:80")\r | |
206 | ('11', 'k')\r | |
207 | >>> mgr.find_user_password("g", "http://h.example.com:80")\r | |
208 | ('11', 'k')\r | |
209 | >>> mgr.find_user_password("h", "i.example.com")\r | |
210 | (None, None)\r | |
211 | >>> mgr.find_user_password("h", "i.example.com:80")\r | |
212 | ('12', 'l')\r | |
213 | >>> mgr.find_user_password("h", "http://i.example.com:80")\r | |
214 | ('12', 'l')\r | |
215 | >>> mgr.find_user_password("i", "j.example.com")\r | |
216 | ('13', 'm')\r | |
217 | >>> mgr.find_user_password("i", "j.example.com:80")\r | |
218 | (None, None)\r | |
219 | >>> mgr.find_user_password("i", "http://j.example.com")\r | |
220 | ('13', 'm')\r | |
221 | >>> mgr.find_user_password("i", "http://j.example.com:80")\r | |
222 | (None, None)\r | |
223 | \r | |
224 | """\r | |
225 | \r | |
226 | class MockOpener:\r | |
227 | addheaders = []\r | |
228 | def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r | |
229 | self.req, self.data, self.timeout = req, data, timeout\r | |
230 | def error(self, proto, *args):\r | |
231 | self.proto, self.args = proto, args\r | |
232 | \r | |
233 | class MockFile:\r | |
234 | def read(self, count=None): pass\r | |
235 | def readline(self, count=None): pass\r | |
236 | def close(self): pass\r | |
237 | \r | |
238 | class MockHeaders(dict):\r | |
239 | def getheaders(self, name):\r | |
240 | return self.values()\r | |
241 | \r | |
242 | class MockResponse(StringIO.StringIO):\r | |
243 | def __init__(self, code, msg, headers, data, url=None):\r | |
244 | StringIO.StringIO.__init__(self, data)\r | |
245 | self.code, self.msg, self.headers, self.url = code, msg, headers, url\r | |
246 | def info(self):\r | |
247 | return self.headers\r | |
248 | def geturl(self):\r | |
249 | return self.url\r | |
250 | \r | |
251 | class MockCookieJar:\r | |
252 | def add_cookie_header(self, request):\r | |
253 | self.ach_req = request\r | |
254 | def extract_cookies(self, response, request):\r | |
255 | self.ec_req, self.ec_r = request, response\r | |
256 | \r | |
257 | class FakeMethod:\r | |
258 | def __init__(self, meth_name, action, handle):\r | |
259 | self.meth_name = meth_name\r | |
260 | self.handle = handle\r | |
261 | self.action = action\r | |
262 | def __call__(self, *args):\r | |
263 | return self.handle(self.meth_name, self.action, *args)\r | |
264 | \r | |
265 | class MockHTTPResponse:\r | |
266 | def __init__(self, fp, msg, status, reason):\r | |
267 | self.fp = fp\r | |
268 | self.msg = msg\r | |
269 | self.status = status\r | |
270 | self.reason = reason\r | |
271 | def read(self):\r | |
272 | return ''\r | |
273 | \r | |
274 | class MockHTTPClass:\r | |
275 | def __init__(self):\r | |
276 | self.req_headers = []\r | |
277 | self.data = None\r | |
278 | self.raise_on_endheaders = False\r | |
279 | self._tunnel_headers = {}\r | |
280 | \r | |
281 | def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r | |
282 | self.host = host\r | |
283 | self.timeout = timeout\r | |
284 | return self\r | |
285 | \r | |
286 | def set_debuglevel(self, level):\r | |
287 | self.level = level\r | |
288 | \r | |
289 | def set_tunnel(self, host, port=None, headers=None):\r | |
290 | self._tunnel_host = host\r | |
291 | self._tunnel_port = port\r | |
292 | if headers:\r | |
293 | self._tunnel_headers = headers\r | |
294 | else:\r | |
295 | self._tunnel_headers.clear()\r | |
296 | def request(self, method, url, body=None, headers=None):\r | |
297 | self.method = method\r | |
298 | self.selector = url\r | |
299 | if headers is not None:\r | |
300 | self.req_headers += headers.items()\r | |
301 | self.req_headers.sort()\r | |
302 | if body:\r | |
303 | self.data = body\r | |
304 | if self.raise_on_endheaders:\r | |
305 | import socket\r | |
306 | raise socket.error()\r | |
307 | def getresponse(self):\r | |
308 | return MockHTTPResponse(MockFile(), {}, 200, "OK")\r | |
309 | \r | |
310 | class MockHandler:\r | |
311 | # useful for testing handler machinery\r | |
312 | # see add_ordered_mock_handlers() docstring\r | |
313 | handler_order = 500\r | |
314 | def __init__(self, methods):\r | |
315 | self._define_methods(methods)\r | |
316 | def _define_methods(self, methods):\r | |
317 | for spec in methods:\r | |
318 | if len(spec) == 2: name, action = spec\r | |
319 | else: name, action = spec, None\r | |
320 | meth = FakeMethod(name, action, self.handle)\r | |
321 | setattr(self.__class__, name, meth)\r | |
322 | def handle(self, fn_name, action, *args, **kwds):\r | |
323 | self.parent.calls.append((self, fn_name, args, kwds))\r | |
324 | if action is None:\r | |
325 | return None\r | |
326 | elif action == "return self":\r | |
327 | return self\r | |
328 | elif action == "return response":\r | |
329 | res = MockResponse(200, "OK", {}, "")\r | |
330 | return res\r | |
331 | elif action == "return request":\r | |
332 | return Request("http://blah/")\r | |
333 | elif action.startswith("error"):\r | |
334 | code = action[action.rfind(" ")+1:]\r | |
335 | try:\r | |
336 | code = int(code)\r | |
337 | except ValueError:\r | |
338 | pass\r | |
339 | res = MockResponse(200, "OK", {}, "")\r | |
340 | return self.parent.error("http", args[0], res, code, "", {})\r | |
341 | elif action == "raise":\r | |
342 | raise urllib2.URLError("blah")\r | |
343 | assert False\r | |
344 | def close(self): pass\r | |
345 | def add_parent(self, parent):\r | |
346 | self.parent = parent\r | |
347 | self.parent.calls = []\r | |
348 | def __lt__(self, other):\r | |
349 | if not hasattr(other, "handler_order"):\r | |
350 | # No handler_order, leave in original order. Yuck.\r | |
351 | return True\r | |
352 | return self.handler_order < other.handler_order\r | |
353 | \r | |
354 | def add_ordered_mock_handlers(opener, meth_spec):\r | |
355 | """Create MockHandlers and add them to an OpenerDirector.\r | |
356 | \r | |
357 | meth_spec: list of lists of tuples and strings defining methods to define\r | |
358 | on handlers. eg:\r | |
359 | \r | |
360 | [["http_error", "ftp_open"], ["http_open"]]\r | |
361 | \r | |
362 | defines methods .http_error() and .ftp_open() on one handler, and\r | |
363 | .http_open() on another. These methods just record their arguments and\r | |
364 | return None. Using a tuple instead of a string causes the method to\r | |
365 | perform some action (see MockHandler.handle()), eg:\r | |
366 | \r | |
367 | [["http_error"], [("http_open", "return request")]]\r | |
368 | \r | |
369 | defines .http_error() on one handler (which simply returns None), and\r | |
370 | .http_open() on another handler, which returns a Request object.\r | |
371 | \r | |
372 | """\r | |
373 | handlers = []\r | |
374 | count = 0\r | |
375 | for meths in meth_spec:\r | |
376 | class MockHandlerSubclass(MockHandler): pass\r | |
377 | h = MockHandlerSubclass(meths)\r | |
378 | h.handler_order += count\r | |
379 | h.add_parent(opener)\r | |
380 | count = count + 1\r | |
381 | handlers.append(h)\r | |
382 | opener.add_handler(h)\r | |
383 | return handlers\r | |
384 | \r | |
385 | def build_test_opener(*handler_instances):\r | |
386 | opener = OpenerDirector()\r | |
387 | for h in handler_instances:\r | |
388 | opener.add_handler(h)\r | |
389 | return opener\r | |
390 | \r | |
391 | class MockHTTPHandler(urllib2.BaseHandler):\r | |
392 | # useful for testing redirections and auth\r | |
393 | # sends supplied headers and code as first response\r | |
394 | # sends 200 OK as second response\r | |
395 | def __init__(self, code, headers):\r | |
396 | self.code = code\r | |
397 | self.headers = headers\r | |
398 | self.reset()\r | |
399 | def reset(self):\r | |
400 | self._count = 0\r | |
401 | self.requests = []\r | |
402 | def http_open(self, req):\r | |
403 | import mimetools, httplib, copy\r | |
404 | from StringIO import StringIO\r | |
405 | self.requests.append(copy.deepcopy(req))\r | |
406 | if self._count == 0:\r | |
407 | self._count = self._count + 1\r | |
408 | name = httplib.responses[self.code]\r | |
409 | msg = mimetools.Message(StringIO(self.headers))\r | |
410 | return self.parent.error(\r | |
411 | "http", req, MockFile(), self.code, name, msg)\r | |
412 | else:\r | |
413 | self.req = req\r | |
414 | msg = mimetools.Message(StringIO("\r\n\r\n"))\r | |
415 | return MockResponse(200, "OK", msg, "", req.get_full_url())\r | |
416 | \r | |
417 | class MockHTTPSHandler(urllib2.AbstractHTTPHandler):\r | |
418 | # Useful for testing the Proxy-Authorization request by verifying the\r | |
419 | # properties of httpcon\r | |
420 | \r | |
421 | def __init__(self):\r | |
422 | urllib2.AbstractHTTPHandler.__init__(self)\r | |
423 | self.httpconn = MockHTTPClass()\r | |
424 | \r | |
425 | def https_open(self, req):\r | |
426 | return self.do_open(self.httpconn, req)\r | |
427 | \r | |
428 | class MockPasswordManager:\r | |
429 | def add_password(self, realm, uri, user, password):\r | |
430 | self.realm = realm\r | |
431 | self.url = uri\r | |
432 | self.user = user\r | |
433 | self.password = password\r | |
434 | def find_user_password(self, realm, authuri):\r | |
435 | self.target_realm = realm\r | |
436 | self.target_url = authuri\r | |
437 | return self.user, self.password\r | |
438 | \r | |
439 | \r | |
440 | class OpenerDirectorTests(unittest.TestCase):\r | |
441 | \r | |
442 | def test_add_non_handler(self):\r | |
443 | class NonHandler(object):\r | |
444 | pass\r | |
445 | self.assertRaises(TypeError,\r | |
446 | OpenerDirector().add_handler, NonHandler())\r | |
447 | \r | |
448 | def test_badly_named_methods(self):\r | |
449 | # test work-around for three methods that accidentally follow the\r | |
450 | # naming conventions for handler methods\r | |
451 | # (*_open() / *_request() / *_response())\r | |
452 | \r | |
453 | # These used to call the accidentally-named methods, causing a\r | |
454 | # TypeError in real code; here, returning self from these mock\r | |
455 | # methods would either cause no exception, or AttributeError.\r | |
456 | \r | |
457 | from urllib2 import URLError\r | |
458 | \r | |
459 | o = OpenerDirector()\r | |
460 | meth_spec = [\r | |
461 | [("do_open", "return self"), ("proxy_open", "return self")],\r | |
462 | [("redirect_request", "return self")],\r | |
463 | ]\r | |
464 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
465 | o.add_handler(urllib2.UnknownHandler())\r | |
466 | for scheme in "do", "proxy", "redirect":\r | |
467 | self.assertRaises(URLError, o.open, scheme+"://example.com/")\r | |
468 | \r | |
469 | def test_handled(self):\r | |
470 | # handler returning non-None means no more handlers will be called\r | |
471 | o = OpenerDirector()\r | |
472 | meth_spec = [\r | |
473 | ["http_open", "ftp_open", "http_error_302"],\r | |
474 | ["ftp_open"],\r | |
475 | [("http_open", "return self")],\r | |
476 | [("http_open", "return self")],\r | |
477 | ]\r | |
478 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
479 | \r | |
480 | req = Request("http://example.com/")\r | |
481 | r = o.open(req)\r | |
482 | # Second .http_open() gets called, third doesn't, since second returned\r | |
483 | # non-None. Handlers without .http_open() never get any methods called\r | |
484 | # on them.\r | |
485 | # In fact, second mock handler defining .http_open() returns self\r | |
486 | # (instead of response), which becomes the OpenerDirector's return\r | |
487 | # value.\r | |
488 | self.assertEqual(r, handlers[2])\r | |
489 | calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]\r | |
490 | for expected, got in zip(calls, o.calls):\r | |
491 | handler, name, args, kwds = got\r | |
492 | self.assertEqual((handler, name), expected)\r | |
493 | self.assertEqual(args, (req,))\r | |
494 | \r | |
495 | def test_handler_order(self):\r | |
496 | o = OpenerDirector()\r | |
497 | handlers = []\r | |
498 | for meths, handler_order in [\r | |
499 | ([("http_open", "return self")], 500),\r | |
500 | (["http_open"], 0),\r | |
501 | ]:\r | |
502 | class MockHandlerSubclass(MockHandler): pass\r | |
503 | h = MockHandlerSubclass(meths)\r | |
504 | h.handler_order = handler_order\r | |
505 | handlers.append(h)\r | |
506 | o.add_handler(h)\r | |
507 | \r | |
508 | r = o.open("http://example.com/")\r | |
509 | # handlers called in reverse order, thanks to their sort order\r | |
510 | self.assertEqual(o.calls[0][0], handlers[1])\r | |
511 | self.assertEqual(o.calls[1][0], handlers[0])\r | |
512 | \r | |
513 | def test_raise(self):\r | |
514 | # raising URLError stops processing of request\r | |
515 | o = OpenerDirector()\r | |
516 | meth_spec = [\r | |
517 | [("http_open", "raise")],\r | |
518 | [("http_open", "return self")],\r | |
519 | ]\r | |
520 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
521 | \r | |
522 | req = Request("http://example.com/")\r | |
523 | self.assertRaises(urllib2.URLError, o.open, req)\r | |
524 | self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])\r | |
525 | \r | |
526 | ## def test_error(self):\r | |
527 | ## # XXX this doesn't actually seem to be used in standard library,\r | |
528 | ## # but should really be tested anyway...\r | |
529 | \r | |
530 | def test_http_error(self):\r | |
531 | # XXX http_error_default\r | |
532 | # http errors are a special case\r | |
533 | o = OpenerDirector()\r | |
534 | meth_spec = [\r | |
535 | [("http_open", "error 302")],\r | |
536 | [("http_error_400", "raise"), "http_open"],\r | |
537 | [("http_error_302", "return response"), "http_error_303",\r | |
538 | "http_error"],\r | |
539 | [("http_error_302")],\r | |
540 | ]\r | |
541 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
542 | \r | |
543 | class Unknown:\r | |
544 | def __eq__(self, other): return True\r | |
545 | \r | |
546 | req = Request("http://example.com/")\r | |
547 | r = o.open(req)\r | |
548 | assert len(o.calls) == 2\r | |
549 | calls = [(handlers[0], "http_open", (req,)),\r | |
550 | (handlers[2], "http_error_302",\r | |
551 | (req, Unknown(), 302, "", {}))]\r | |
552 | for expected, got in zip(calls, o.calls):\r | |
553 | handler, method_name, args = expected\r | |
554 | self.assertEqual((handler, method_name), got[:2])\r | |
555 | self.assertEqual(args, got[2])\r | |
556 | \r | |
557 | def test_processors(self):\r | |
558 | # *_request / *_response methods get called appropriately\r | |
559 | o = OpenerDirector()\r | |
560 | meth_spec = [\r | |
561 | [("http_request", "return request"),\r | |
562 | ("http_response", "return response")],\r | |
563 | [("http_request", "return request"),\r | |
564 | ("http_response", "return response")],\r | |
565 | ]\r | |
566 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
567 | \r | |
568 | req = Request("http://example.com/")\r | |
569 | r = o.open(req)\r | |
570 | # processor methods are called on *all* handlers that define them,\r | |
571 | # not just the first handler that handles the request\r | |
572 | calls = [\r | |
573 | (handlers[0], "http_request"), (handlers[1], "http_request"),\r | |
574 | (handlers[0], "http_response"), (handlers[1], "http_response")]\r | |
575 | \r | |
576 | for i, (handler, name, args, kwds) in enumerate(o.calls):\r | |
577 | if i < 2:\r | |
578 | # *_request\r | |
579 | self.assertEqual((handler, name), calls[i])\r | |
580 | self.assertEqual(len(args), 1)\r | |
581 | self.assertIsInstance(args[0], Request)\r | |
582 | else:\r | |
583 | # *_response\r | |
584 | self.assertEqual((handler, name), calls[i])\r | |
585 | self.assertEqual(len(args), 2)\r | |
586 | self.assertIsInstance(args[0], Request)\r | |
587 | # response from opener.open is None, because there's no\r | |
588 | # handler that defines http_open to handle it\r | |
589 | self.assertTrue(args[1] is None or\r | |
590 | isinstance(args[1], MockResponse))\r | |
591 | \r | |
592 | \r | |
593 | def sanepathname2url(path):\r | |
594 | import urllib\r | |
595 | urlpath = urllib.pathname2url(path)\r | |
596 | if os.name == "nt" and urlpath.startswith("///"):\r | |
597 | urlpath = urlpath[2:]\r | |
598 | # XXX don't ask me about the mac...\r | |
599 | return urlpath\r | |
600 | \r | |
601 | class HandlerTests(unittest.TestCase):\r | |
602 | \r | |
603 | def test_ftp(self):\r | |
604 | class MockFTPWrapper:\r | |
605 | def __init__(self, data): self.data = data\r | |
606 | def retrfile(self, filename, filetype):\r | |
607 | self.filename, self.filetype = filename, filetype\r | |
608 | return StringIO.StringIO(self.data), len(self.data)\r | |
609 | \r | |
610 | class NullFTPHandler(urllib2.FTPHandler):\r | |
611 | def __init__(self, data): self.data = data\r | |
612 | def connect_ftp(self, user, passwd, host, port, dirs,\r | |
613 | timeout=socket._GLOBAL_DEFAULT_TIMEOUT):\r | |
614 | self.user, self.passwd = user, passwd\r | |
615 | self.host, self.port = host, port\r | |
616 | self.dirs = dirs\r | |
617 | self.ftpwrapper = MockFTPWrapper(self.data)\r | |
618 | return self.ftpwrapper\r | |
619 | \r | |
620 | import ftplib\r | |
621 | data = "rheum rhaponicum"\r | |
622 | h = NullFTPHandler(data)\r | |
623 | o = h.parent = MockOpener()\r | |
624 | \r | |
625 | for url, host, port, user, passwd, type_, dirs, filename, mimetype in [\r | |
626 | ("ftp://localhost/foo/bar/baz.html",\r | |
627 | "localhost", ftplib.FTP_PORT, "", "", "I",\r | |
628 | ["foo", "bar"], "baz.html", "text/html"),\r | |
629 | ("ftp://parrot@localhost/foo/bar/baz.html",\r | |
630 | "localhost", ftplib.FTP_PORT, "parrot", "", "I",\r | |
631 | ["foo", "bar"], "baz.html", "text/html"),\r | |
632 | ("ftp://%25parrot@localhost/foo/bar/baz.html",\r | |
633 | "localhost", ftplib.FTP_PORT, "%parrot", "", "I",\r | |
634 | ["foo", "bar"], "baz.html", "text/html"),\r | |
635 | ("ftp://%2542parrot@localhost/foo/bar/baz.html",\r | |
636 | "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",\r | |
637 | ["foo", "bar"], "baz.html", "text/html"),\r | |
638 | ("ftp://localhost:80/foo/bar/",\r | |
639 | "localhost", 80, "", "", "D",\r | |
640 | ["foo", "bar"], "", None),\r | |
641 | ("ftp://localhost/baz.gif;type=a",\r | |
642 | "localhost", ftplib.FTP_PORT, "", "", "A",\r | |
643 | [], "baz.gif", None), # XXX really this should guess image/gif\r | |
644 | ]:\r | |
645 | req = Request(url)\r | |
646 | req.timeout = None\r | |
647 | r = h.ftp_open(req)\r | |
648 | # ftp authentication not yet implemented by FTPHandler\r | |
649 | self.assertEqual(h.user, user)\r | |
650 | self.assertEqual(h.passwd, passwd)\r | |
651 | self.assertEqual(h.host, socket.gethostbyname(host))\r | |
652 | self.assertEqual(h.port, port)\r | |
653 | self.assertEqual(h.dirs, dirs)\r | |
654 | self.assertEqual(h.ftpwrapper.filename, filename)\r | |
655 | self.assertEqual(h.ftpwrapper.filetype, type_)\r | |
656 | headers = r.info()\r | |
657 | self.assertEqual(headers.get("Content-type"), mimetype)\r | |
658 | self.assertEqual(int(headers["Content-length"]), len(data))\r | |
659 | \r | |
660 | def test_file(self):\r | |
661 | import rfc822, socket\r | |
662 | h = urllib2.FileHandler()\r | |
663 | o = h.parent = MockOpener()\r | |
664 | \r | |
665 | TESTFN = test_support.TESTFN\r | |
666 | urlpath = sanepathname2url(os.path.abspath(TESTFN))\r | |
667 | towrite = "hello, world\n"\r | |
668 | urls = [\r | |
669 | "file://localhost%s" % urlpath,\r | |
670 | "file://%s" % urlpath,\r | |
671 | "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),\r | |
672 | ]\r | |
673 | try:\r | |
674 | localaddr = socket.gethostbyname(socket.gethostname())\r | |
675 | except socket.gaierror:\r | |
676 | localaddr = ''\r | |
677 | if localaddr:\r | |
678 | urls.append("file://%s%s" % (localaddr, urlpath))\r | |
679 | \r | |
680 | for url in urls:\r | |
681 | f = open(TESTFN, "wb")\r | |
682 | try:\r | |
683 | try:\r | |
684 | f.write(towrite)\r | |
685 | finally:\r | |
686 | f.close()\r | |
687 | \r | |
688 | r = h.file_open(Request(url))\r | |
689 | try:\r | |
690 | data = r.read()\r | |
691 | headers = r.info()\r | |
692 | respurl = r.geturl()\r | |
693 | finally:\r | |
694 | r.close()\r | |
695 | stats = os.stat(TESTFN)\r | |
696 | modified = rfc822.formatdate(stats.st_mtime)\r | |
697 | finally:\r | |
698 | os.remove(TESTFN)\r | |
699 | self.assertEqual(data, towrite)\r | |
700 | self.assertEqual(headers["Content-type"], "text/plain")\r | |
701 | self.assertEqual(headers["Content-length"], "13")\r | |
702 | self.assertEqual(headers["Last-modified"], modified)\r | |
703 | self.assertEqual(respurl, url)\r | |
704 | \r | |
705 | for url in [\r | |
706 | "file://localhost:80%s" % urlpath,\r | |
707 | "file:///file_does_not_exist.txt",\r | |
708 | "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),\r | |
709 | os.getcwd(), TESTFN),\r | |
710 | "file://somerandomhost.ontheinternet.com%s/%s" %\r | |
711 | (os.getcwd(), TESTFN),\r | |
712 | ]:\r | |
713 | try:\r | |
714 | f = open(TESTFN, "wb")\r | |
715 | try:\r | |
716 | f.write(towrite)\r | |
717 | finally:\r | |
718 | f.close()\r | |
719 | \r | |
720 | self.assertRaises(urllib2.URLError,\r | |
721 | h.file_open, Request(url))\r | |
722 | finally:\r | |
723 | os.remove(TESTFN)\r | |
724 | \r | |
725 | h = urllib2.FileHandler()\r | |
726 | o = h.parent = MockOpener()\r | |
727 | # XXXX why does // mean ftp (and /// mean not ftp!), and where\r | |
728 | # is file: scheme specified? I think this is really a bug, and\r | |
729 | # what was intended was to distinguish between URLs like:\r | |
730 | # file:/blah.txt (a file)\r | |
731 | # file://localhost/blah.txt (a file)\r | |
732 | # file:///blah.txt (a file)\r | |
733 | # file://ftp.example.com/blah.txt (an ftp URL)\r | |
734 | for url, ftp in [\r | |
735 | ("file://ftp.example.com//foo.txt", True),\r | |
736 | ("file://ftp.example.com///foo.txt", False),\r | |
737 | # XXXX bug: fails with OSError, should be URLError\r | |
738 | ("file://ftp.example.com/foo.txt", False),\r | |
739 | ("file://somehost//foo/something.txt", True),\r | |
740 | ("file://localhost//foo/something.txt", False),\r | |
741 | ]:\r | |
742 | req = Request(url)\r | |
743 | try:\r | |
744 | h.file_open(req)\r | |
745 | # XXXX remove OSError when bug fixed\r | |
746 | except (urllib2.URLError, OSError):\r | |
747 | self.assertTrue(not ftp)\r | |
748 | else:\r | |
749 | self.assertTrue(o.req is req)\r | |
750 | self.assertEqual(req.type, "ftp")\r | |
751 | self.assertEqual(req.type == "ftp", ftp)\r | |
752 | \r | |
753 | def test_http(self):\r | |
754 | \r | |
755 | h = urllib2.AbstractHTTPHandler()\r | |
756 | o = h.parent = MockOpener()\r | |
757 | \r | |
758 | url = "http://example.com/"\r | |
759 | for method, data in [("GET", None), ("POST", "blah")]:\r | |
760 | req = Request(url, data, {"Foo": "bar"})\r | |
761 | req.timeout = None\r | |
762 | req.add_unredirected_header("Spam", "eggs")\r | |
763 | http = MockHTTPClass()\r | |
764 | r = h.do_open(http, req)\r | |
765 | \r | |
766 | # result attributes\r | |
767 | r.read; r.readline # wrapped MockFile methods\r | |
768 | r.info; r.geturl # addinfourl methods\r | |
769 | r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply()\r | |
770 | hdrs = r.info()\r | |
771 | hdrs.get; hdrs.has_key # r.info() gives dict from .getreply()\r | |
772 | self.assertEqual(r.geturl(), url)\r | |
773 | \r | |
774 | self.assertEqual(http.host, "example.com")\r | |
775 | self.assertEqual(http.level, 0)\r | |
776 | self.assertEqual(http.method, method)\r | |
777 | self.assertEqual(http.selector, "/")\r | |
778 | self.assertEqual(http.req_headers,\r | |
779 | [("Connection", "close"),\r | |
780 | ("Foo", "bar"), ("Spam", "eggs")])\r | |
781 | self.assertEqual(http.data, data)\r | |
782 | \r | |
783 | # check socket.error converted to URLError\r | |
784 | http.raise_on_endheaders = True\r | |
785 | self.assertRaises(urllib2.URLError, h.do_open, http, req)\r | |
786 | \r | |
787 | # check adding of standard headers\r | |
788 | o.addheaders = [("Spam", "eggs")]\r | |
789 | for data in "", None: # POST, GET\r | |
790 | req = Request("http://example.com/", data)\r | |
791 | r = MockResponse(200, "OK", {}, "")\r | |
792 | newreq = h.do_request_(req)\r | |
793 | if data is None: # GET\r | |
794 | self.assertNotIn("Content-length", req.unredirected_hdrs)\r | |
795 | self.assertNotIn("Content-type", req.unredirected_hdrs)\r | |
796 | else: # POST\r | |
797 | self.assertEqual(req.unredirected_hdrs["Content-length"], "0")\r | |
798 | self.assertEqual(req.unredirected_hdrs["Content-type"],\r | |
799 | "application/x-www-form-urlencoded")\r | |
800 | # XXX the details of Host could be better tested\r | |
801 | self.assertEqual(req.unredirected_hdrs["Host"], "example.com")\r | |
802 | self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")\r | |
803 | \r | |
804 | # don't clobber existing headers\r | |
805 | req.add_unredirected_header("Content-length", "foo")\r | |
806 | req.add_unredirected_header("Content-type", "bar")\r | |
807 | req.add_unredirected_header("Host", "baz")\r | |
808 | req.add_unredirected_header("Spam", "foo")\r | |
809 | newreq = h.do_request_(req)\r | |
810 | self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")\r | |
811 | self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")\r | |
812 | self.assertEqual(req.unredirected_hdrs["Host"], "baz")\r | |
813 | self.assertEqual(req.unredirected_hdrs["Spam"], "foo")\r | |
814 | \r | |
815 | def test_http_doubleslash(self):\r | |
816 | # Checks that the presence of an unnecessary double slash in a url doesn't break anything\r | |
817 | # Previously, a double slash directly after the host could cause incorrect parsing of the url\r | |
818 | h = urllib2.AbstractHTTPHandler()\r | |
819 | o = h.parent = MockOpener()\r | |
820 | \r | |
821 | data = ""\r | |
822 | ds_urls = [\r | |
823 | "http://example.com/foo/bar/baz.html",\r | |
824 | "http://example.com//foo/bar/baz.html",\r | |
825 | "http://example.com/foo//bar/baz.html",\r | |
826 | "http://example.com/foo/bar//baz.html",\r | |
827 | ]\r | |
828 | \r | |
829 | for ds_url in ds_urls:\r | |
830 | ds_req = Request(ds_url, data)\r | |
831 | \r | |
832 | # Check whether host is determined correctly if there is no proxy\r | |
833 | np_ds_req = h.do_request_(ds_req)\r | |
834 | self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")\r | |
835 | \r | |
836 | # Check whether host is determined correctly if there is a proxy\r | |
837 | ds_req.set_proxy("someproxy:3128",None)\r | |
838 | p_ds_req = h.do_request_(ds_req)\r | |
839 | self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")\r | |
840 | \r | |
841 | def test_fixpath_in_weirdurls(self):\r | |
842 | # Issue4493: urllib2 to supply '/' when to urls where path does not\r | |
843 | # start with'/'\r | |
844 | \r | |
845 | h = urllib2.AbstractHTTPHandler()\r | |
846 | o = h.parent = MockOpener()\r | |
847 | \r | |
848 | weird_url = 'http://www.python.org?getspam'\r | |
849 | req = Request(weird_url)\r | |
850 | newreq = h.do_request_(req)\r | |
851 | self.assertEqual(newreq.get_host(),'www.python.org')\r | |
852 | self.assertEqual(newreq.get_selector(),'/?getspam')\r | |
853 | \r | |
854 | url_without_path = 'http://www.python.org'\r | |
855 | req = Request(url_without_path)\r | |
856 | newreq = h.do_request_(req)\r | |
857 | self.assertEqual(newreq.get_host(),'www.python.org')\r | |
858 | self.assertEqual(newreq.get_selector(),'')\r | |
859 | \r | |
860 | def test_errors(self):\r | |
861 | h = urllib2.HTTPErrorProcessor()\r | |
862 | o = h.parent = MockOpener()\r | |
863 | \r | |
864 | url = "http://example.com/"\r | |
865 | req = Request(url)\r | |
866 | # all 2xx are passed through\r | |
867 | r = MockResponse(200, "OK", {}, "", url)\r | |
868 | newr = h.http_response(req, r)\r | |
869 | self.assertTrue(r is newr)\r | |
870 | self.assertTrue(not hasattr(o, "proto")) # o.error not called\r | |
871 | r = MockResponse(202, "Accepted", {}, "", url)\r | |
872 | newr = h.http_response(req, r)\r | |
873 | self.assertTrue(r is newr)\r | |
874 | self.assertTrue(not hasattr(o, "proto")) # o.error not called\r | |
875 | r = MockResponse(206, "Partial content", {}, "", url)\r | |
876 | newr = h.http_response(req, r)\r | |
877 | self.assertTrue(r is newr)\r | |
878 | self.assertTrue(not hasattr(o, "proto")) # o.error not called\r | |
879 | # anything else calls o.error (and MockOpener returns None, here)\r | |
880 | r = MockResponse(502, "Bad gateway", {}, "", url)\r | |
881 | self.assertTrue(h.http_response(req, r) is None)\r | |
882 | self.assertEqual(o.proto, "http") # o.error called\r | |
883 | self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))\r | |
884 | \r | |
885 | def test_cookies(self):\r | |
886 | cj = MockCookieJar()\r | |
887 | h = urllib2.HTTPCookieProcessor(cj)\r | |
888 | o = h.parent = MockOpener()\r | |
889 | \r | |
890 | req = Request("http://example.com/")\r | |
891 | r = MockResponse(200, "OK", {}, "")\r | |
892 | newreq = h.http_request(req)\r | |
893 | self.assertTrue(cj.ach_req is req is newreq)\r | |
894 | self.assertEqual(req.get_origin_req_host(), "example.com")\r | |
895 | self.assertTrue(not req.is_unverifiable())\r | |
896 | newr = h.http_response(req, r)\r | |
897 | self.assertTrue(cj.ec_req is req)\r | |
898 | self.assertTrue(cj.ec_r is r is newr)\r | |
899 | \r | |
900 | def test_redirect(self):\r | |
901 | from_url = "http://example.com/a.html"\r | |
902 | to_url = "http://example.com/b.html"\r | |
903 | h = urllib2.HTTPRedirectHandler()\r | |
904 | o = h.parent = MockOpener()\r | |
905 | \r | |
906 | # ordinary redirect behaviour\r | |
907 | for code in 301, 302, 303, 307:\r | |
908 | for data in None, "blah\nblah\n":\r | |
909 | method = getattr(h, "http_error_%s" % code)\r | |
910 | req = Request(from_url, data)\r | |
911 | req.add_header("Nonsense", "viking=withhold")\r | |
912 | req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r | |
913 | if data is not None:\r | |
914 | req.add_header("Content-Length", str(len(data)))\r | |
915 | req.add_unredirected_header("Spam", "spam")\r | |
916 | try:\r | |
917 | method(req, MockFile(), code, "Blah",\r | |
918 | MockHeaders({"location": to_url}))\r | |
919 | except urllib2.HTTPError:\r | |
920 | # 307 in response to POST requires user OK\r | |
921 | self.assertTrue(code == 307 and data is not None)\r | |
922 | self.assertEqual(o.req.get_full_url(), to_url)\r | |
923 | try:\r | |
924 | self.assertEqual(o.req.get_method(), "GET")\r | |
925 | except AttributeError:\r | |
926 | self.assertTrue(not o.req.has_data())\r | |
927 | \r | |
928 | # now it's a GET, there should not be headers regarding content\r | |
929 | # (possibly dragged from before being a POST)\r | |
930 | headers = [x.lower() for x in o.req.headers]\r | |
931 | self.assertNotIn("content-length", headers)\r | |
932 | self.assertNotIn("content-type", headers)\r | |
933 | \r | |
934 | self.assertEqual(o.req.headers["Nonsense"],\r | |
935 | "viking=withhold")\r | |
936 | self.assertNotIn("Spam", o.req.headers)\r | |
937 | self.assertNotIn("Spam", o.req.unredirected_hdrs)\r | |
938 | \r | |
939 | # loop detection\r | |
940 | req = Request(from_url)\r | |
941 | req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r | |
942 | def redirect(h, req, url=to_url):\r | |
943 | h.http_error_302(req, MockFile(), 302, "Blah",\r | |
944 | MockHeaders({"location": url}))\r | |
945 | # Note that the *original* request shares the same record of\r | |
946 | # redirections with the sub-requests caused by the redirections.\r | |
947 | \r | |
948 | # detect infinite loop redirect of a URL to itself\r | |
949 | req = Request(from_url, origin_req_host="example.com")\r | |
950 | count = 0\r | |
951 | req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r | |
952 | try:\r | |
953 | while 1:\r | |
954 | redirect(h, req, "http://example.com/")\r | |
955 | count = count + 1\r | |
956 | except urllib2.HTTPError:\r | |
957 | # don't stop until max_repeats, because cookies may introduce state\r | |
958 | self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)\r | |
959 | \r | |
960 | # detect endless non-repeating chain of redirects\r | |
961 | req = Request(from_url, origin_req_host="example.com")\r | |
962 | count = 0\r | |
963 | req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r | |
964 | try:\r | |
965 | while 1:\r | |
966 | redirect(h, req, "http://example.com/%d" % count)\r | |
967 | count = count + 1\r | |
968 | except urllib2.HTTPError:\r | |
969 | self.assertEqual(count,\r | |
970 | urllib2.HTTPRedirectHandler.max_redirections)\r | |
971 | \r | |
972 | def test_invalid_redirect(self):\r | |
973 | from_url = "http://example.com/a.html"\r | |
974 | valid_schemes = ['http', 'https', 'ftp']\r | |
975 | invalid_schemes = ['file', 'imap', 'ldap']\r | |
976 | schemeless_url = "example.com/b.html"\r | |
977 | h = urllib2.HTTPRedirectHandler()\r | |
978 | o = h.parent = MockOpener()\r | |
979 | req = Request(from_url)\r | |
980 | req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT\r | |
981 | \r | |
982 | for scheme in invalid_schemes:\r | |
983 | invalid_url = scheme + '://' + schemeless_url\r | |
984 | self.assertRaises(urllib2.HTTPError, h.http_error_302,\r | |
985 | req, MockFile(), 302, "Security Loophole",\r | |
986 | MockHeaders({"location": invalid_url}))\r | |
987 | \r | |
988 | for scheme in valid_schemes:\r | |
989 | valid_url = scheme + '://' + schemeless_url\r | |
990 | h.http_error_302(req, MockFile(), 302, "That's fine",\r | |
991 | MockHeaders({"location": valid_url}))\r | |
992 | self.assertEqual(o.req.get_full_url(), valid_url)\r | |
993 | \r | |
994 | def test_cookie_redirect(self):\r | |
995 | # cookies shouldn't leak into redirected requests\r | |
996 | from cookielib import CookieJar\r | |
997 | \r | |
998 | from test.test_cookielib import interact_netscape\r | |
999 | \r | |
1000 | cj = CookieJar()\r | |
1001 | interact_netscape(cj, "http://www.example.com/", "spam=eggs")\r | |
1002 | hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")\r | |
1003 | hdeh = urllib2.HTTPDefaultErrorHandler()\r | |
1004 | hrh = urllib2.HTTPRedirectHandler()\r | |
1005 | cp = urllib2.HTTPCookieProcessor(cj)\r | |
1006 | o = build_test_opener(hh, hdeh, hrh, cp)\r | |
1007 | o.open("http://www.example.com/")\r | |
1008 | self.assertTrue(not hh.req.has_header("Cookie"))\r | |
1009 | \r | |
1010 | def test_redirect_fragment(self):\r | |
1011 | redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'\r | |
1012 | hh = MockHTTPHandler(302, 'Location: ' + redirected_url)\r | |
1013 | hdeh = urllib2.HTTPDefaultErrorHandler()\r | |
1014 | hrh = urllib2.HTTPRedirectHandler()\r | |
1015 | o = build_test_opener(hh, hdeh, hrh)\r | |
1016 | fp = o.open('http://www.example.com')\r | |
1017 | self.assertEqual(fp.geturl(), redirected_url.strip())\r | |
1018 | \r | |
1019 | def test_proxy(self):\r | |
1020 | o = OpenerDirector()\r | |
1021 | ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))\r | |
1022 | o.add_handler(ph)\r | |
1023 | meth_spec = [\r | |
1024 | [("http_open", "return response")]\r | |
1025 | ]\r | |
1026 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
1027 | \r | |
1028 | req = Request("http://acme.example.com/")\r | |
1029 | self.assertEqual(req.get_host(), "acme.example.com")\r | |
1030 | r = o.open(req)\r | |
1031 | self.assertEqual(req.get_host(), "proxy.example.com:3128")\r | |
1032 | \r | |
1033 | self.assertEqual([(handlers[0], "http_open")],\r | |
1034 | [tup[0:2] for tup in o.calls])\r | |
1035 | \r | |
1036 | def test_proxy_no_proxy(self):\r | |
1037 | os.environ['no_proxy'] = 'python.org'\r | |
1038 | o = OpenerDirector()\r | |
1039 | ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))\r | |
1040 | o.add_handler(ph)\r | |
1041 | req = Request("http://www.perl.org/")\r | |
1042 | self.assertEqual(req.get_host(), "www.perl.org")\r | |
1043 | r = o.open(req)\r | |
1044 | self.assertEqual(req.get_host(), "proxy.example.com")\r | |
1045 | req = Request("http://www.python.org")\r | |
1046 | self.assertEqual(req.get_host(), "www.python.org")\r | |
1047 | r = o.open(req)\r | |
1048 | self.assertEqual(req.get_host(), "www.python.org")\r | |
1049 | del os.environ['no_proxy']\r | |
1050 | \r | |
1051 | \r | |
1052 | def test_proxy_https(self):\r | |
1053 | o = OpenerDirector()\r | |
1054 | ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))\r | |
1055 | o.add_handler(ph)\r | |
1056 | meth_spec = [\r | |
1057 | [("https_open","return response")]\r | |
1058 | ]\r | |
1059 | handlers = add_ordered_mock_handlers(o, meth_spec)\r | |
1060 | req = Request("https://www.example.com/")\r | |
1061 | self.assertEqual(req.get_host(), "www.example.com")\r | |
1062 | r = o.open(req)\r | |
1063 | self.assertEqual(req.get_host(), "proxy.example.com:3128")\r | |
1064 | self.assertEqual([(handlers[0], "https_open")],\r | |
1065 | [tup[0:2] for tup in o.calls])\r | |
1066 | \r | |
1067 | def test_proxy_https_proxy_authorization(self):\r | |
1068 | o = OpenerDirector()\r | |
1069 | ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))\r | |
1070 | o.add_handler(ph)\r | |
1071 | https_handler = MockHTTPSHandler()\r | |
1072 | o.add_handler(https_handler)\r | |
1073 | req = Request("https://www.example.com/")\r | |
1074 | req.add_header("Proxy-Authorization","FooBar")\r | |
1075 | req.add_header("User-Agent","Grail")\r | |
1076 | self.assertEqual(req.get_host(), "www.example.com")\r | |
1077 | self.assertIsNone(req._tunnel_host)\r | |
1078 | r = o.open(req)\r | |
1079 | # Verify Proxy-Authorization gets tunneled to request.\r | |
1080 | # httpsconn req_headers do not have the Proxy-Authorization header but\r | |
1081 | # the req will have.\r | |
1082 | self.assertNotIn(("Proxy-Authorization","FooBar"),\r | |
1083 | https_handler.httpconn.req_headers)\r | |
1084 | self.assertIn(("User-Agent","Grail"),\r | |
1085 | https_handler.httpconn.req_headers)\r | |
1086 | self.assertIsNotNone(req._tunnel_host)\r | |
1087 | self.assertEqual(req.get_host(), "proxy.example.com:3128")\r | |
1088 | self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")\r | |
1089 | \r | |
1090 | def test_basic_auth(self, quote_char='"'):\r | |
1091 | opener = OpenerDirector()\r | |
1092 | password_manager = MockPasswordManager()\r | |
1093 | auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)\r | |
1094 | realm = "ACME Widget Store"\r | |
1095 | http_handler = MockHTTPHandler(\r | |
1096 | 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %\r | |
1097 | (quote_char, realm, quote_char) )\r | |
1098 | opener.add_handler(auth_handler)\r | |
1099 | opener.add_handler(http_handler)\r | |
1100 | self._test_basic_auth(opener, auth_handler, "Authorization",\r | |
1101 | realm, http_handler, password_manager,\r | |
1102 | "http://acme.example.com/protected",\r | |
1103 | "http://acme.example.com/protected",\r | |
1104 | )\r | |
1105 | \r | |
1106 | def test_basic_auth_with_single_quoted_realm(self):\r | |
1107 | self.test_basic_auth(quote_char="'")\r | |
1108 | \r | |
1109 | def test_proxy_basic_auth(self):\r | |
1110 | opener = OpenerDirector()\r | |
1111 | ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))\r | |
1112 | opener.add_handler(ph)\r | |
1113 | password_manager = MockPasswordManager()\r | |
1114 | auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)\r | |
1115 | realm = "ACME Networks"\r | |
1116 | http_handler = MockHTTPHandler(\r | |
1117 | 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)\r | |
1118 | opener.add_handler(auth_handler)\r | |
1119 | opener.add_handler(http_handler)\r | |
1120 | self._test_basic_auth(opener, auth_handler, "Proxy-authorization",\r | |
1121 | realm, http_handler, password_manager,\r | |
1122 | "http://acme.example.com:3128/protected",\r | |
1123 | "proxy.example.com:3128",\r | |
1124 | )\r | |
1125 | \r | |
1126 | def test_basic_and_digest_auth_handlers(self):\r | |
1127 | # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*\r | |
1128 | # response (http://python.org/sf/1479302), where it should instead\r | |
1129 | # return None to allow another handler (especially\r | |
1130 | # HTTPBasicAuthHandler) to handle the response.\r | |
1131 | \r | |
1132 | # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must\r | |
1133 | # try digest first (since it's the strongest auth scheme), so we record\r | |
1134 | # order of calls here to check digest comes first:\r | |
1135 | class RecordingOpenerDirector(OpenerDirector):\r | |
1136 | def __init__(self):\r | |
1137 | OpenerDirector.__init__(self)\r | |
1138 | self.recorded = []\r | |
1139 | def record(self, info):\r | |
1140 | self.recorded.append(info)\r | |
1141 | class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):\r | |
1142 | def http_error_401(self, *args, **kwds):\r | |
1143 | self.parent.record("digest")\r | |
1144 | urllib2.HTTPDigestAuthHandler.http_error_401(self,\r | |
1145 | *args, **kwds)\r | |
1146 | class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):\r | |
1147 | def http_error_401(self, *args, **kwds):\r | |
1148 | self.parent.record("basic")\r | |
1149 | urllib2.HTTPBasicAuthHandler.http_error_401(self,\r | |
1150 | *args, **kwds)\r | |
1151 | \r | |
1152 | opener = RecordingOpenerDirector()\r | |
1153 | password_manager = MockPasswordManager()\r | |
1154 | digest_handler = TestDigestAuthHandler(password_manager)\r | |
1155 | basic_handler = TestBasicAuthHandler(password_manager)\r | |
1156 | realm = "ACME Networks"\r | |
1157 | http_handler = MockHTTPHandler(\r | |
1158 | 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)\r | |
1159 | opener.add_handler(basic_handler)\r | |
1160 | opener.add_handler(digest_handler)\r | |
1161 | opener.add_handler(http_handler)\r | |
1162 | \r | |
1163 | # check basic auth isn't blocked by digest handler failing\r | |
1164 | self._test_basic_auth(opener, basic_handler, "Authorization",\r | |
1165 | realm, http_handler, password_manager,\r | |
1166 | "http://acme.example.com/protected",\r | |
1167 | "http://acme.example.com/protected",\r | |
1168 | )\r | |
1169 | # check digest was tried before basic (twice, because\r | |
1170 | # _test_basic_auth called .open() twice)\r | |
1171 | self.assertEqual(opener.recorded, ["digest", "basic"]*2)\r | |
1172 | \r | |
1173 | def _test_basic_auth(self, opener, auth_handler, auth_header,\r | |
1174 | realm, http_handler, password_manager,\r | |
1175 | request_url, protected_url):\r | |
1176 | import base64\r | |
1177 | user, password = "wile", "coyote"\r | |
1178 | \r | |
1179 | # .add_password() fed through to password manager\r | |
1180 | auth_handler.add_password(realm, request_url, user, password)\r | |
1181 | self.assertEqual(realm, password_manager.realm)\r | |
1182 | self.assertEqual(request_url, password_manager.url)\r | |
1183 | self.assertEqual(user, password_manager.user)\r | |
1184 | self.assertEqual(password, password_manager.password)\r | |
1185 | \r | |
1186 | r = opener.open(request_url)\r | |
1187 | \r | |
1188 | # should have asked the password manager for the username/password\r | |
1189 | self.assertEqual(password_manager.target_realm, realm)\r | |
1190 | self.assertEqual(password_manager.target_url, protected_url)\r | |
1191 | \r | |
1192 | # expect one request without authorization, then one with\r | |
1193 | self.assertEqual(len(http_handler.requests), 2)\r | |
1194 | self.assertFalse(http_handler.requests[0].has_header(auth_header))\r | |
1195 | userpass = '%s:%s' % (user, password)\r | |
1196 | auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()\r | |
1197 | self.assertEqual(http_handler.requests[1].get_header(auth_header),\r | |
1198 | auth_hdr_value)\r | |
1199 | self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],\r | |
1200 | auth_hdr_value)\r | |
1201 | # if the password manager can't find a password, the handler won't\r | |
1202 | # handle the HTTP auth error\r | |
1203 | password_manager.user = password_manager.password = None\r | |
1204 | http_handler.reset()\r | |
1205 | r = opener.open(request_url)\r | |
1206 | self.assertEqual(len(http_handler.requests), 1)\r | |
1207 | self.assertFalse(http_handler.requests[0].has_header(auth_header))\r | |
1208 | \r | |
1209 | class MiscTests(unittest.TestCase):\r | |
1210 | \r | |
1211 | def test_build_opener(self):\r | |
1212 | class MyHTTPHandler(urllib2.HTTPHandler): pass\r | |
1213 | class FooHandler(urllib2.BaseHandler):\r | |
1214 | def foo_open(self): pass\r | |
1215 | class BarHandler(urllib2.BaseHandler):\r | |
1216 | def bar_open(self): pass\r | |
1217 | \r | |
1218 | build_opener = urllib2.build_opener\r | |
1219 | \r | |
1220 | o = build_opener(FooHandler, BarHandler)\r | |
1221 | self.opener_has_handler(o, FooHandler)\r | |
1222 | self.opener_has_handler(o, BarHandler)\r | |
1223 | \r | |
1224 | # can take a mix of classes and instances\r | |
1225 | o = build_opener(FooHandler, BarHandler())\r | |
1226 | self.opener_has_handler(o, FooHandler)\r | |
1227 | self.opener_has_handler(o, BarHandler)\r | |
1228 | \r | |
1229 | # subclasses of default handlers override default handlers\r | |
1230 | o = build_opener(MyHTTPHandler)\r | |
1231 | self.opener_has_handler(o, MyHTTPHandler)\r | |
1232 | \r | |
1233 | # a particular case of overriding: default handlers can be passed\r | |
1234 | # in explicitly\r | |
1235 | o = build_opener()\r | |
1236 | self.opener_has_handler(o, urllib2.HTTPHandler)\r | |
1237 | o = build_opener(urllib2.HTTPHandler)\r | |
1238 | self.opener_has_handler(o, urllib2.HTTPHandler)\r | |
1239 | o = build_opener(urllib2.HTTPHandler())\r | |
1240 | self.opener_has_handler(o, urllib2.HTTPHandler)\r | |
1241 | \r | |
1242 | # Issue2670: multiple handlers sharing the same base class\r | |
1243 | class MyOtherHTTPHandler(urllib2.HTTPHandler): pass\r | |
1244 | o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)\r | |
1245 | self.opener_has_handler(o, MyHTTPHandler)\r | |
1246 | self.opener_has_handler(o, MyOtherHTTPHandler)\r | |
1247 | \r | |
1248 | def opener_has_handler(self, opener, handler_class):\r | |
1249 | for h in opener.handlers:\r | |
1250 | if h.__class__ == handler_class:\r | |
1251 | break\r | |
1252 | else:\r | |
1253 | self.assertTrue(False)\r | |
1254 | \r | |
1255 | class RequestTests(unittest.TestCase):\r | |
1256 | \r | |
1257 | def setUp(self):\r | |
1258 | self.get = urllib2.Request("http://www.python.org/~jeremy/")\r | |
1259 | self.post = urllib2.Request("http://www.python.org/~jeremy/",\r | |
1260 | "data",\r | |
1261 | headers={"X-Test": "test"})\r | |
1262 | \r | |
1263 | def test_method(self):\r | |
1264 | self.assertEqual("POST", self.post.get_method())\r | |
1265 | self.assertEqual("GET", self.get.get_method())\r | |
1266 | \r | |
1267 | def test_add_data(self):\r | |
1268 | self.assertTrue(not self.get.has_data())\r | |
1269 | self.assertEqual("GET", self.get.get_method())\r | |
1270 | self.get.add_data("spam")\r | |
1271 | self.assertTrue(self.get.has_data())\r | |
1272 | self.assertEqual("POST", self.get.get_method())\r | |
1273 | \r | |
1274 | def test_get_full_url(self):\r | |
1275 | self.assertEqual("http://www.python.org/~jeremy/",\r | |
1276 | self.get.get_full_url())\r | |
1277 | \r | |
1278 | def test_selector(self):\r | |
1279 | self.assertEqual("/~jeremy/", self.get.get_selector())\r | |
1280 | req = urllib2.Request("http://www.python.org/")\r | |
1281 | self.assertEqual("/", req.get_selector())\r | |
1282 | \r | |
1283 | def test_get_type(self):\r | |
1284 | self.assertEqual("http", self.get.get_type())\r | |
1285 | \r | |
1286 | def test_get_host(self):\r | |
1287 | self.assertEqual("www.python.org", self.get.get_host())\r | |
1288 | \r | |
1289 | def test_get_host_unquote(self):\r | |
1290 | req = urllib2.Request("http://www.%70ython.org/")\r | |
1291 | self.assertEqual("www.python.org", req.get_host())\r | |
1292 | \r | |
1293 | def test_proxy(self):\r | |
1294 | self.assertTrue(not self.get.has_proxy())\r | |
1295 | self.get.set_proxy("www.perl.org", "http")\r | |
1296 | self.assertTrue(self.get.has_proxy())\r | |
1297 | self.assertEqual("www.python.org", self.get.get_origin_req_host())\r | |
1298 | self.assertEqual("www.perl.org", self.get.get_host())\r | |
1299 | \r | |
1300 | def test_wrapped_url(self):\r | |
1301 | req = Request("<URL:http://www.python.org>")\r | |
1302 | self.assertEqual("www.python.org", req.get_host())\r | |
1303 | \r | |
1304 | def test_url_fragment(self):\r | |
1305 | req = Request("http://www.python.org/?qs=query#fragment=true")\r | |
1306 | self.assertEqual("/?qs=query", req.get_selector())\r | |
1307 | req = Request("http://www.python.org/#fun=true")\r | |
1308 | self.assertEqual("/", req.get_selector())\r | |
1309 | \r | |
1310 | # Issue 11703: geturl() omits fragment in the original URL.\r | |
1311 | url = 'http://docs.python.org/library/urllib2.html#OK'\r | |
1312 | req = Request(url)\r | |
1313 | self.assertEqual(req.get_full_url(), url)\r | |
1314 | \r | |
1315 | def test_main(verbose=None):\r | |
1316 | from test import test_urllib2\r | |
1317 | test_support.run_doctest(test_urllib2, verbose)\r | |
1318 | test_support.run_doctest(urllib2, verbose)\r | |
1319 | tests = (TrivialTests,\r | |
1320 | OpenerDirectorTests,\r | |
1321 | HandlerTests,\r | |
1322 | MiscTests,\r | |
1323 | RequestTests)\r | |
1324 | test_support.run_unittest(*tests)\r | |
1325 | \r | |
1326 | if __name__ == "__main__":\r | |
1327 | test_main(verbose=True)\r |