]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_httpservers.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_httpservers.py
CommitLineData
4710c53d 1"""Unittests for the various HTTPServer modules.\r
2\r
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,\r
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.\r
5"""\r
6\r
7from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer\r
8from SimpleHTTPServer import SimpleHTTPRequestHandler\r
9from CGIHTTPServer import CGIHTTPRequestHandler\r
10import CGIHTTPServer\r
11\r
12import os\r
13import sys\r
14import re\r
15import base64\r
16import shutil\r
17import urllib\r
18import httplib\r
19import tempfile\r
20\r
21import unittest\r
22\r
23from StringIO import StringIO\r
24\r
25from test import test_support\r
26threading = test_support.import_module('threading')\r
27\r
28\r
29class NoLogRequestHandler:\r
30 def log_message(self, *args):\r
31 # don't write log messages to stderr\r
32 pass\r
33\r
34class SocketlessRequestHandler(SimpleHTTPRequestHandler):\r
35 def __init__(self):\r
36 self.get_called = False\r
37 self.protocol_version = "HTTP/1.1"\r
38\r
39 def do_GET(self):\r
40 self.get_called = True\r
41 self.send_response(200)\r
42 self.send_header('Content-Type', 'text/html')\r
43 self.end_headers()\r
44 self.wfile.write(b'<html><body>Data</body></html>\r\n')\r
45\r
46 def log_message(self, format, *args):\r
47 pass\r
48\r
49\r
50class TestServerThread(threading.Thread):\r
51 def __init__(self, test_object, request_handler):\r
52 threading.Thread.__init__(self)\r
53 self.request_handler = request_handler\r
54 self.test_object = test_object\r
55\r
56 def run(self):\r
57 self.server = HTTPServer(('', 0), self.request_handler)\r
58 self.test_object.PORT = self.server.socket.getsockname()[1]\r
59 self.test_object.server_started.set()\r
60 self.test_object = None\r
61 try:\r
62 self.server.serve_forever(0.05)\r
63 finally:\r
64 self.server.server_close()\r
65\r
66 def stop(self):\r
67 self.server.shutdown()\r
68\r
69\r
70class BaseTestCase(unittest.TestCase):\r
71 def setUp(self):\r
72 self._threads = test_support.threading_setup()\r
73 os.environ = test_support.EnvironmentVarGuard()\r
74 self.server_started = threading.Event()\r
75 self.thread = TestServerThread(self, self.request_handler)\r
76 self.thread.start()\r
77 self.server_started.wait()\r
78\r
79 def tearDown(self):\r
80 self.thread.stop()\r
81 os.environ.__exit__()\r
82 test_support.threading_cleanup(*self._threads)\r
83\r
84 def request(self, uri, method='GET', body=None, headers={}):\r
85 self.connection = httplib.HTTPConnection('localhost', self.PORT)\r
86 self.connection.request(method, uri, body, headers)\r
87 return self.connection.getresponse()\r
88\r
89class BaseHTTPRequestHandlerTestCase(unittest.TestCase):\r
90 """Test the functionality of the BaseHTTPServer focussing on\r
91 BaseHTTPRequestHandler.\r
92 """\r
93\r
94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK')\r
95\r
96 def setUp (self):\r
97 self.handler = SocketlessRequestHandler()\r
98\r
99 def send_typical_request(self, message):\r
100 input = StringIO(message)\r
101 output = StringIO()\r
102 self.handler.rfile = input\r
103 self.handler.wfile = output\r
104 self.handler.handle_one_request()\r
105 output.seek(0)\r
106 return output.readlines()\r
107\r
108 def verify_get_called(self):\r
109 self.assertTrue(self.handler.get_called)\r
110\r
111 def verify_expected_headers(self, headers):\r
112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ':\r
113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)\r
114\r
115 def verify_http_server_response(self, response):\r
116 match = self.HTTPResponseMatch.search(response)\r
117 self.assertTrue(match is not None)\r
118\r
119 def test_http_1_1(self):\r
120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')\r
121 self.verify_http_server_response(result[0])\r
122 self.verify_expected_headers(result[1:-1])\r
123 self.verify_get_called()\r
124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')\r
125\r
126 def test_http_1_0(self):\r
127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n')\r
128 self.verify_http_server_response(result[0])\r
129 self.verify_expected_headers(result[1:-1])\r
130 self.verify_get_called()\r
131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')\r
132\r
133 def test_http_0_9(self):\r
134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n')\r
135 self.assertEqual(len(result), 1)\r
136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n')\r
137 self.verify_get_called()\r
138\r
139 def test_with_continue_1_0(self):\r
140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')\r
141 self.verify_http_server_response(result[0])\r
142 self.verify_expected_headers(result[1:-1])\r
143 self.verify_get_called()\r
144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')\r
145\r
146 def test_request_length(self):\r
147 # Issue #10714: huge request lines are discarded, to avoid Denial\r
148 # of Service attacks.\r
149 result = self.send_typical_request(b'GET ' + b'x' * 65537)\r
150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')\r
151 self.assertFalse(self.handler.get_called)\r
152\r
153\r
154class BaseHTTPServerTestCase(BaseTestCase):\r
155 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):\r
156 protocol_version = 'HTTP/1.1'\r
157 default_request_version = 'HTTP/1.1'\r
158\r
159 def do_TEST(self):\r
160 self.send_response(204)\r
161 self.send_header('Content-Type', 'text/html')\r
162 self.send_header('Connection', 'close')\r
163 self.end_headers()\r
164\r
165 def do_KEEP(self):\r
166 self.send_response(204)\r
167 self.send_header('Content-Type', 'text/html')\r
168 self.send_header('Connection', 'keep-alive')\r
169 self.end_headers()\r
170\r
171 def do_KEYERROR(self):\r
172 self.send_error(999)\r
173\r
174 def do_CUSTOM(self):\r
175 self.send_response(999)\r
176 self.send_header('Content-Type', 'text/html')\r
177 self.send_header('Connection', 'close')\r
178 self.end_headers()\r
179\r
180 def setUp(self):\r
181 BaseTestCase.setUp(self)\r
182 self.con = httplib.HTTPConnection('localhost', self.PORT)\r
183 self.con.connect()\r
184\r
185 def test_command(self):\r
186 self.con.request('GET', '/')\r
187 res = self.con.getresponse()\r
188 self.assertEqual(res.status, 501)\r
189\r
190 def test_request_line_trimming(self):\r
191 self.con._http_vsn_str = 'HTTP/1.1\n'\r
192 self.con.putrequest('GET', '/')\r
193 self.con.endheaders()\r
194 res = self.con.getresponse()\r
195 self.assertEqual(res.status, 501)\r
196\r
197 def test_version_bogus(self):\r
198 self.con._http_vsn_str = 'FUBAR'\r
199 self.con.putrequest('GET', '/')\r
200 self.con.endheaders()\r
201 res = self.con.getresponse()\r
202 self.assertEqual(res.status, 400)\r
203\r
204 def test_version_digits(self):\r
205 self.con._http_vsn_str = 'HTTP/9.9.9'\r
206 self.con.putrequest('GET', '/')\r
207 self.con.endheaders()\r
208 res = self.con.getresponse()\r
209 self.assertEqual(res.status, 400)\r
210\r
211 def test_version_none_get(self):\r
212 self.con._http_vsn_str = ''\r
213 self.con.putrequest('GET', '/')\r
214 self.con.endheaders()\r
215 res = self.con.getresponse()\r
216 self.assertEqual(res.status, 501)\r
217\r
218 def test_version_none(self):\r
219 self.con._http_vsn_str = ''\r
220 self.con.putrequest('PUT', '/')\r
221 self.con.endheaders()\r
222 res = self.con.getresponse()\r
223 self.assertEqual(res.status, 400)\r
224\r
225 def test_version_invalid(self):\r
226 self.con._http_vsn = 99\r
227 self.con._http_vsn_str = 'HTTP/9.9'\r
228 self.con.putrequest('GET', '/')\r
229 self.con.endheaders()\r
230 res = self.con.getresponse()\r
231 self.assertEqual(res.status, 505)\r
232\r
233 def test_send_blank(self):\r
234 self.con._http_vsn_str = ''\r
235 self.con.putrequest('', '')\r
236 self.con.endheaders()\r
237 res = self.con.getresponse()\r
238 self.assertEqual(res.status, 400)\r
239\r
240 def test_header_close(self):\r
241 self.con.putrequest('GET', '/')\r
242 self.con.putheader('Connection', 'close')\r
243 self.con.endheaders()\r
244 res = self.con.getresponse()\r
245 self.assertEqual(res.status, 501)\r
246\r
247 def test_head_keep_alive(self):\r
248 self.con._http_vsn_str = 'HTTP/1.1'\r
249 self.con.putrequest('GET', '/')\r
250 self.con.putheader('Connection', 'keep-alive')\r
251 self.con.endheaders()\r
252 res = self.con.getresponse()\r
253 self.assertEqual(res.status, 501)\r
254\r
255 def test_handler(self):\r
256 self.con.request('TEST', '/')\r
257 res = self.con.getresponse()\r
258 self.assertEqual(res.status, 204)\r
259\r
260 def test_return_header_keep_alive(self):\r
261 self.con.request('KEEP', '/')\r
262 res = self.con.getresponse()\r
263 self.assertEqual(res.getheader('Connection'), 'keep-alive')\r
264 self.con.request('TEST', '/')\r
265 self.addCleanup(self.con.close)\r
266\r
267 def test_internal_key_error(self):\r
268 self.con.request('KEYERROR', '/')\r
269 res = self.con.getresponse()\r
270 self.assertEqual(res.status, 999)\r
271\r
272 def test_return_custom_status(self):\r
273 self.con.request('CUSTOM', '/')\r
274 res = self.con.getresponse()\r
275 self.assertEqual(res.status, 999)\r
276\r
277\r
278class SimpleHTTPServerTestCase(BaseTestCase):\r
279 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):\r
280 pass\r
281\r
282 def setUp(self):\r
283 BaseTestCase.setUp(self)\r
284 self.cwd = os.getcwd()\r
285 basetempdir = tempfile.gettempdir()\r
286 os.chdir(basetempdir)\r
287 self.data = 'We are the knights who say Ni!'\r
288 self.tempdir = tempfile.mkdtemp(dir=basetempdir)\r
289 self.tempdir_name = os.path.basename(self.tempdir)\r
290 temp = open(os.path.join(self.tempdir, 'test'), 'wb')\r
291 temp.write(self.data)\r
292 temp.close()\r
293\r
294 def tearDown(self):\r
295 try:\r
296 os.chdir(self.cwd)\r
297 try:\r
298 shutil.rmtree(self.tempdir)\r
299 except:\r
300 pass\r
301 finally:\r
302 BaseTestCase.tearDown(self)\r
303\r
304 def check_status_and_reason(self, response, status, data=None):\r
305 body = response.read()\r
306 self.assertTrue(response)\r
307 self.assertEqual(response.status, status)\r
308 self.assertIsNotNone(response.reason)\r
309 if data:\r
310 self.assertEqual(data, body)\r
311\r
312 def test_get(self):\r
313 #constructs the path relative to the root directory of the HTTPServer\r
314 response = self.request(self.tempdir_name + '/test')\r
315 self.check_status_and_reason(response, 200, data=self.data)\r
316 response = self.request(self.tempdir_name + '/')\r
317 self.check_status_and_reason(response, 200)\r
318 response = self.request(self.tempdir_name)\r
319 self.check_status_and_reason(response, 301)\r
320 response = self.request('/ThisDoesNotExist')\r
321 self.check_status_and_reason(response, 404)\r
322 response = self.request('/' + 'ThisDoesNotExist' + '/')\r
323 self.check_status_and_reason(response, 404)\r
324 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')\r
325 response = self.request('/' + self.tempdir_name + '/')\r
326 self.check_status_and_reason(response, 200)\r
327 if os.name == 'posix':\r
328 # chmod won't work as expected on Windows platforms\r
329 os.chmod(self.tempdir, 0)\r
330 response = self.request(self.tempdir_name + '/')\r
331 self.check_status_and_reason(response, 404)\r
332 os.chmod(self.tempdir, 0755)\r
333\r
334 def test_head(self):\r
335 response = self.request(\r
336 self.tempdir_name + '/test', method='HEAD')\r
337 self.check_status_and_reason(response, 200)\r
338 self.assertEqual(response.getheader('content-length'),\r
339 str(len(self.data)))\r
340 self.assertEqual(response.getheader('content-type'),\r
341 'application/octet-stream')\r
342\r
343 def test_invalid_requests(self):\r
344 response = self.request('/', method='FOO')\r
345 self.check_status_and_reason(response, 501)\r
346 # requests must be case sensitive,so this should fail too\r
347 response = self.request('/', method='get')\r
348 self.check_status_and_reason(response, 501)\r
349 response = self.request('/', method='GETs')\r
350 self.check_status_and_reason(response, 501)\r
351\r
352\r
353cgi_file1 = """\\r
354#!%s\r
355\r
356print "Content-type: text/html"\r
357print\r
358print "Hello World"\r
359"""\r
360\r
361cgi_file2 = """\\r
362#!%s\r
363import cgi\r
364\r
365print "Content-type: text/html"\r
366print\r
367\r
368form = cgi.FieldStorage()\r
369print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\r
370 form.getfirst("bacon"))\r
371"""\r
372\r
373class CGIHTTPServerTestCase(BaseTestCase):\r
374 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):\r
375 pass\r
376\r
377 def setUp(self):\r
378 BaseTestCase.setUp(self)\r
379 self.parent_dir = tempfile.mkdtemp()\r
380 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')\r
381 os.mkdir(self.cgi_dir)\r
382\r
383 # The shebang line should be pure ASCII: use symlink if possible.\r
384 # See issue #7668.\r
385 if hasattr(os, 'symlink'):\r
386 self.pythonexe = os.path.join(self.parent_dir, 'python')\r
387 os.symlink(sys.executable, self.pythonexe)\r
388 else:\r
389 self.pythonexe = sys.executable\r
390\r
391 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')\r
392 with open(self.file1_path, 'w') as file1:\r
393 file1.write(cgi_file1 % self.pythonexe)\r
394 os.chmod(self.file1_path, 0777)\r
395\r
396 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')\r
397 with open(self.file2_path, 'w') as file2:\r
398 file2.write(cgi_file2 % self.pythonexe)\r
399 os.chmod(self.file2_path, 0777)\r
400\r
401 self.cwd = os.getcwd()\r
402 os.chdir(self.parent_dir)\r
403\r
404 def tearDown(self):\r
405 try:\r
406 os.chdir(self.cwd)\r
407 if self.pythonexe != sys.executable:\r
408 os.remove(self.pythonexe)\r
409 os.remove(self.file1_path)\r
410 os.remove(self.file2_path)\r
411 os.rmdir(self.cgi_dir)\r
412 os.rmdir(self.parent_dir)\r
413 finally:\r
414 BaseTestCase.tearDown(self)\r
415\r
416 def test_url_collapse_path_split(self):\r
417 test_vectors = {\r
418 '': ('/', ''),\r
419 '..': IndexError,\r
420 '/.//..': IndexError,\r
421 '/': ('/', ''),\r
422 '//': ('/', ''),\r
423 '/\\': ('/', '\\'),\r
424 '/.//': ('/', ''),\r
425 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),\r
426 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),\r
427 'a': ('/', 'a'),\r
428 '/a': ('/', 'a'),\r
429 '//a': ('/', 'a'),\r
430 './a': ('/', 'a'),\r
431 './C:/': ('/C:', ''),\r
432 '/a/b': ('/a', 'b'),\r
433 '/a/b/': ('/a/b', ''),\r
434 '/a/b/c/..': ('/a/b', ''),\r
435 '/a/b/c/../d': ('/a/b', 'd'),\r
436 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'),\r
437 '/a/b/c/../d/e/../../f': ('/a/b', 'f'),\r
438 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),\r
439 '../a/b/c/../d/e/.././././..//f': IndexError,\r
440 '/a/b/c/../d/e/../../../f': ('/a', 'f'),\r
441 '/a/b/c/../d/e/../../../../f': ('/', 'f'),\r
442 '/a/b/c/../d/e/../../../../../f': IndexError,\r
443 '/a/b/c/../d/e/../../../../f/..': ('/', ''),\r
444 }\r
445 for path, expected in test_vectors.iteritems():\r
446 if isinstance(expected, type) and issubclass(expected, Exception):\r
447 self.assertRaises(expected,\r
448 CGIHTTPServer._url_collapse_path_split, path)\r
449 else:\r
450 actual = CGIHTTPServer._url_collapse_path_split(path)\r
451 self.assertEqual(expected, actual,\r
452 msg='path = %r\nGot: %r\nWanted: %r' %\r
453 (path, actual, expected))\r
454\r
455 def test_headers_and_content(self):\r
456 res = self.request('/cgi-bin/file1.py')\r
457 self.assertEqual(('Hello World\n', 'text/html', 200),\r
458 (res.read(), res.getheader('Content-type'), res.status))\r
459\r
460 def test_post(self):\r
461 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})\r
462 headers = {'Content-type' : 'application/x-www-form-urlencoded'}\r
463 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)\r
464\r
465 self.assertEqual(res.read(), '1, python, 123456\n')\r
466\r
467 def test_invaliduri(self):\r
468 res = self.request('/cgi-bin/invalid')\r
469 res.read()\r
470 self.assertEqual(res.status, 404)\r
471\r
472 def test_authorization(self):\r
473 headers = {'Authorization' : 'Basic %s' %\r
474 base64.b64encode('username:pass')}\r
475 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)\r
476 self.assertEqual(('Hello World\n', 'text/html', 200),\r
477 (res.read(), res.getheader('Content-type'), res.status))\r
478\r
479 def test_no_leading_slash(self):\r
480 # http://bugs.python.org/issue2254\r
481 res = self.request('cgi-bin/file1.py')\r
482 self.assertEqual(('Hello World\n', 'text/html', 200),\r
483 (res.read(), res.getheader('Content-type'), res.status))\r
484\r
485 def test_os_environ_is_not_altered(self):\r
486 signature = "Test CGI Server"\r
487 os.environ['SERVER_SOFTWARE'] = signature\r
488 res = self.request('/cgi-bin/file1.py')\r
489 self.assertEqual((b'Hello World\n', 'text/html', 200),\r
490 (res.read(), res.getheader('Content-type'), res.status))\r
491 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)\r
492\r
493\r
494class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):\r
495 """ Test url parsing """\r
496 def setUp(self):\r
497 self.translated = os.getcwd()\r
498 self.translated = os.path.join(self.translated, 'filename')\r
499 self.handler = SocketlessRequestHandler()\r
500\r
501 def test_query_arguments(self):\r
502 path = self.handler.translate_path('/filename')\r
503 self.assertEqual(path, self.translated)\r
504 path = self.handler.translate_path('/filename?foo=bar')\r
505 self.assertEqual(path, self.translated)\r
506 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')\r
507 self.assertEqual(path, self.translated)\r
508\r
509 def test_start_with_double_slash(self):\r
510 path = self.handler.translate_path('//filename')\r
511 self.assertEqual(path, self.translated)\r
512 path = self.handler.translate_path('//filename?foo=bar')\r
513 self.assertEqual(path, self.translated)\r
514\r
515\r
516def test_main(verbose=None):\r
517 try:\r
518 cwd = os.getcwd()\r
519 test_support.run_unittest(BaseHTTPRequestHandlerTestCase,\r
520 SimpleHTTPRequestHandlerTestCase,\r
521 BaseHTTPServerTestCase,\r
522 SimpleHTTPServerTestCase,\r
523 CGIHTTPServerTestCase\r
524 )\r
525 finally:\r
526 os.chdir(cwd)\r
527\r
528if __name__ == '__main__':\r
529 test_main()\r