]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_httpservers.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_httpservers.py
1 """Unittests for the various HTTPServer modules.
2
3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5 """
6
7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8 from SimpleHTTPServer import SimpleHTTPRequestHandler
9 from CGIHTTPServer import CGIHTTPRequestHandler
10 import CGIHTTPServer
11
12 import os
13 import sys
14 import re
15 import base64
16 import shutil
17 import urllib
18 import httplib
19 import tempfile
20
21 import unittest
22
23 from StringIO import StringIO
24
25 from test import test_support
26 threading = test_support.import_module('threading')
27
28
29 class NoLogRequestHandler:
30 def log_message(self, *args):
31 # don't write log messages to stderr
32 pass
33
34 class SocketlessRequestHandler(SimpleHTTPRequestHandler):
35 def __init__(self):
36 self.get_called = False
37 self.protocol_version = "HTTP/1.1"
38
39 def do_GET(self):
40 self.get_called = True
41 self.send_response(200)
42 self.send_header('Content-Type', 'text/html')
43 self.end_headers()
44 self.wfile.write(b'<html><body>Data</body></html>\r\n')
45
46 def log_message(self, format, *args):
47 pass
48
49
50 class TestServerThread(threading.Thread):
51 def __init__(self, test_object, request_handler):
52 threading.Thread.__init__(self)
53 self.request_handler = request_handler
54 self.test_object = test_object
55
56 def run(self):
57 self.server = HTTPServer(('', 0), self.request_handler)
58 self.test_object.PORT = self.server.socket.getsockname()[1]
59 self.test_object.server_started.set()
60 self.test_object = None
61 try:
62 self.server.serve_forever(0.05)
63 finally:
64 self.server.server_close()
65
66 def stop(self):
67 self.server.shutdown()
68
69
70 class BaseTestCase(unittest.TestCase):
71 def setUp(self):
72 self._threads = test_support.threading_setup()
73 os.environ = test_support.EnvironmentVarGuard()
74 self.server_started = threading.Event()
75 self.thread = TestServerThread(self, self.request_handler)
76 self.thread.start()
77 self.server_started.wait()
78
79 def tearDown(self):
80 self.thread.stop()
81 os.environ.__exit__()
82 test_support.threading_cleanup(*self._threads)
83
84 def request(self, uri, method='GET', body=None, headers={}):
85 self.connection = httplib.HTTPConnection('localhost', self.PORT)
86 self.connection.request(method, uri, body, headers)
87 return self.connection.getresponse()
88
89 class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
90 """Test the functionality of the BaseHTTPServer focussing on
91 BaseHTTPRequestHandler.
92 """
93
94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK')
95
96 def setUp (self):
97 self.handler = SocketlessRequestHandler()
98
99 def send_typical_request(self, message):
100 input = StringIO(message)
101 output = StringIO()
102 self.handler.rfile = input
103 self.handler.wfile = output
104 self.handler.handle_one_request()
105 output.seek(0)
106 return output.readlines()
107
108 def verify_get_called(self):
109 self.assertTrue(self.handler.get_called)
110
111 def verify_expected_headers(self, headers):
112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ':
113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
114
115 def verify_http_server_response(self, response):
116 match = self.HTTPResponseMatch.search(response)
117 self.assertTrue(match is not None)
118
119 def test_http_1_1(self):
120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')
121 self.verify_http_server_response(result[0])
122 self.verify_expected_headers(result[1:-1])
123 self.verify_get_called()
124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
125
126 def test_http_1_0(self):
127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n')
128 self.verify_http_server_response(result[0])
129 self.verify_expected_headers(result[1:-1])
130 self.verify_get_called()
131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
132
133 def test_http_0_9(self):
134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n')
135 self.assertEqual(len(result), 1)
136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n')
137 self.verify_get_called()
138
139 def test_with_continue_1_0(self):
140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
141 self.verify_http_server_response(result[0])
142 self.verify_expected_headers(result[1:-1])
143 self.verify_get_called()
144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
145
146 def test_request_length(self):
147 # Issue #10714: huge request lines are discarded, to avoid Denial
148 # of Service attacks.
149 result = self.send_typical_request(b'GET ' + b'x' * 65537)
150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
151 self.assertFalse(self.handler.get_called)
152
153
154 class BaseHTTPServerTestCase(BaseTestCase):
155 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
156 protocol_version = 'HTTP/1.1'
157 default_request_version = 'HTTP/1.1'
158
159 def do_TEST(self):
160 self.send_response(204)
161 self.send_header('Content-Type', 'text/html')
162 self.send_header('Connection', 'close')
163 self.end_headers()
164
165 def do_KEEP(self):
166 self.send_response(204)
167 self.send_header('Content-Type', 'text/html')
168 self.send_header('Connection', 'keep-alive')
169 self.end_headers()
170
171 def do_KEYERROR(self):
172 self.send_error(999)
173
174 def do_CUSTOM(self):
175 self.send_response(999)
176 self.send_header('Content-Type', 'text/html')
177 self.send_header('Connection', 'close')
178 self.end_headers()
179
180 def setUp(self):
181 BaseTestCase.setUp(self)
182 self.con = httplib.HTTPConnection('localhost', self.PORT)
183 self.con.connect()
184
185 def test_command(self):
186 self.con.request('GET', '/')
187 res = self.con.getresponse()
188 self.assertEqual(res.status, 501)
189
190 def test_request_line_trimming(self):
191 self.con._http_vsn_str = 'HTTP/1.1\n'
192 self.con.putrequest('GET', '/')
193 self.con.endheaders()
194 res = self.con.getresponse()
195 self.assertEqual(res.status, 501)
196
197 def test_version_bogus(self):
198 self.con._http_vsn_str = 'FUBAR'
199 self.con.putrequest('GET', '/')
200 self.con.endheaders()
201 res = self.con.getresponse()
202 self.assertEqual(res.status, 400)
203
204 def test_version_digits(self):
205 self.con._http_vsn_str = 'HTTP/9.9.9'
206 self.con.putrequest('GET', '/')
207 self.con.endheaders()
208 res = self.con.getresponse()
209 self.assertEqual(res.status, 400)
210
211 def test_version_none_get(self):
212 self.con._http_vsn_str = ''
213 self.con.putrequest('GET', '/')
214 self.con.endheaders()
215 res = self.con.getresponse()
216 self.assertEqual(res.status, 501)
217
218 def test_version_none(self):
219 self.con._http_vsn_str = ''
220 self.con.putrequest('PUT', '/')
221 self.con.endheaders()
222 res = self.con.getresponse()
223 self.assertEqual(res.status, 400)
224
225 def test_version_invalid(self):
226 self.con._http_vsn = 99
227 self.con._http_vsn_str = 'HTTP/9.9'
228 self.con.putrequest('GET', '/')
229 self.con.endheaders()
230 res = self.con.getresponse()
231 self.assertEqual(res.status, 505)
232
233 def test_send_blank(self):
234 self.con._http_vsn_str = ''
235 self.con.putrequest('', '')
236 self.con.endheaders()
237 res = self.con.getresponse()
238 self.assertEqual(res.status, 400)
239
240 def test_header_close(self):
241 self.con.putrequest('GET', '/')
242 self.con.putheader('Connection', 'close')
243 self.con.endheaders()
244 res = self.con.getresponse()
245 self.assertEqual(res.status, 501)
246
247 def test_head_keep_alive(self):
248 self.con._http_vsn_str = 'HTTP/1.1'
249 self.con.putrequest('GET', '/')
250 self.con.putheader('Connection', 'keep-alive')
251 self.con.endheaders()
252 res = self.con.getresponse()
253 self.assertEqual(res.status, 501)
254
255 def test_handler(self):
256 self.con.request('TEST', '/')
257 res = self.con.getresponse()
258 self.assertEqual(res.status, 204)
259
260 def test_return_header_keep_alive(self):
261 self.con.request('KEEP', '/')
262 res = self.con.getresponse()
263 self.assertEqual(res.getheader('Connection'), 'keep-alive')
264 self.con.request('TEST', '/')
265 self.addCleanup(self.con.close)
266
267 def test_internal_key_error(self):
268 self.con.request('KEYERROR', '/')
269 res = self.con.getresponse()
270 self.assertEqual(res.status, 999)
271
272 def test_return_custom_status(self):
273 self.con.request('CUSTOM', '/')
274 res = self.con.getresponse()
275 self.assertEqual(res.status, 999)
276
277
278 class SimpleHTTPServerTestCase(BaseTestCase):
279 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
280 pass
281
282 def setUp(self):
283 BaseTestCase.setUp(self)
284 self.cwd = os.getcwd()
285 basetempdir = tempfile.gettempdir()
286 os.chdir(basetempdir)
287 self.data = 'We are the knights who say Ni!'
288 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
289 self.tempdir_name = os.path.basename(self.tempdir)
290 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
291 temp.write(self.data)
292 temp.close()
293
294 def tearDown(self):
295 try:
296 os.chdir(self.cwd)
297 try:
298 shutil.rmtree(self.tempdir)
299 except:
300 pass
301 finally:
302 BaseTestCase.tearDown(self)
303
304 def check_status_and_reason(self, response, status, data=None):
305 body = response.read()
306 self.assertTrue(response)
307 self.assertEqual(response.status, status)
308 self.assertIsNotNone(response.reason)
309 if data:
310 self.assertEqual(data, body)
311
312 def test_get(self):
313 #constructs the path relative to the root directory of the HTTPServer
314 response = self.request(self.tempdir_name + '/test')
315 self.check_status_and_reason(response, 200, data=self.data)
316 response = self.request(self.tempdir_name + '/')
317 self.check_status_and_reason(response, 200)
318 response = self.request(self.tempdir_name)
319 self.check_status_and_reason(response, 301)
320 response = self.request('/ThisDoesNotExist')
321 self.check_status_and_reason(response, 404)
322 response = self.request('/' + 'ThisDoesNotExist' + '/')
323 self.check_status_and_reason(response, 404)
324 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
325 response = self.request('/' + self.tempdir_name + '/')
326 self.check_status_and_reason(response, 200)
327 if os.name == 'posix':
328 # chmod won't work as expected on Windows platforms
329 os.chmod(self.tempdir, 0)
330 response = self.request(self.tempdir_name + '/')
331 self.check_status_and_reason(response, 404)
332 os.chmod(self.tempdir, 0755)
333
334 def test_head(self):
335 response = self.request(
336 self.tempdir_name + '/test', method='HEAD')
337 self.check_status_and_reason(response, 200)
338 self.assertEqual(response.getheader('content-length'),
339 str(len(self.data)))
340 self.assertEqual(response.getheader('content-type'),
341 'application/octet-stream')
342
343 def test_invalid_requests(self):
344 response = self.request('/', method='FOO')
345 self.check_status_and_reason(response, 501)
346 # requests must be case sensitive,so this should fail too
347 response = self.request('/', method='get')
348 self.check_status_and_reason(response, 501)
349 response = self.request('/', method='GETs')
350 self.check_status_and_reason(response, 501)
351
352
353 cgi_file1 = """\
354 #!%s
355
356 print "Content-type: text/html"
357 print
358 print "Hello World"
359 """
360
361 cgi_file2 = """\
362 #!%s
363 import cgi
364
365 print "Content-type: text/html"
366 print
367
368 form = cgi.FieldStorage()
369 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
370 form.getfirst("bacon"))
371 """
372
373 class CGIHTTPServerTestCase(BaseTestCase):
374 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
375 pass
376
377 def setUp(self):
378 BaseTestCase.setUp(self)
379 self.parent_dir = tempfile.mkdtemp()
380 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
381 os.mkdir(self.cgi_dir)
382
383 # The shebang line should be pure ASCII: use symlink if possible.
384 # See issue #7668.
385 if hasattr(os, 'symlink'):
386 self.pythonexe = os.path.join(self.parent_dir, 'python')
387 os.symlink(sys.executable, self.pythonexe)
388 else:
389 self.pythonexe = sys.executable
390
391 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
392 with open(self.file1_path, 'w') as file1:
393 file1.write(cgi_file1 % self.pythonexe)
394 os.chmod(self.file1_path, 0777)
395
396 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
397 with open(self.file2_path, 'w') as file2:
398 file2.write(cgi_file2 % self.pythonexe)
399 os.chmod(self.file2_path, 0777)
400
401 self.cwd = os.getcwd()
402 os.chdir(self.parent_dir)
403
404 def tearDown(self):
405 try:
406 os.chdir(self.cwd)
407 if self.pythonexe != sys.executable:
408 os.remove(self.pythonexe)
409 os.remove(self.file1_path)
410 os.remove(self.file2_path)
411 os.rmdir(self.cgi_dir)
412 os.rmdir(self.parent_dir)
413 finally:
414 BaseTestCase.tearDown(self)
415
416 def test_url_collapse_path_split(self):
417 test_vectors = {
418 '': ('/', ''),
419 '..': IndexError,
420 '/.//..': IndexError,
421 '/': ('/', ''),
422 '//': ('/', ''),
423 '/\\': ('/', '\\'),
424 '/.//': ('/', ''),
425 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
426 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
427 'a': ('/', 'a'),
428 '/a': ('/', 'a'),
429 '//a': ('/', 'a'),
430 './a': ('/', 'a'),
431 './C:/': ('/C:', ''),
432 '/a/b': ('/a', 'b'),
433 '/a/b/': ('/a/b', ''),
434 '/a/b/c/..': ('/a/b', ''),
435 '/a/b/c/../d': ('/a/b', 'd'),
436 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'),
437 '/a/b/c/../d/e/../../f': ('/a/b', 'f'),
438 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),
439 '../a/b/c/../d/e/.././././..//f': IndexError,
440 '/a/b/c/../d/e/../../../f': ('/a', 'f'),
441 '/a/b/c/../d/e/../../../../f': ('/', 'f'),
442 '/a/b/c/../d/e/../../../../../f': IndexError,
443 '/a/b/c/../d/e/../../../../f/..': ('/', ''),
444 }
445 for path, expected in test_vectors.iteritems():
446 if isinstance(expected, type) and issubclass(expected, Exception):
447 self.assertRaises(expected,
448 CGIHTTPServer._url_collapse_path_split, path)
449 else:
450 actual = CGIHTTPServer._url_collapse_path_split(path)
451 self.assertEqual(expected, actual,
452 msg='path = %r\nGot: %r\nWanted: %r' %
453 (path, actual, expected))
454
455 def test_headers_and_content(self):
456 res = self.request('/cgi-bin/file1.py')
457 self.assertEqual(('Hello World\n', 'text/html', 200),
458 (res.read(), res.getheader('Content-type'), res.status))
459
460 def test_post(self):
461 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
462 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
463 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
464
465 self.assertEqual(res.read(), '1, python, 123456\n')
466
467 def test_invaliduri(self):
468 res = self.request('/cgi-bin/invalid')
469 res.read()
470 self.assertEqual(res.status, 404)
471
472 def test_authorization(self):
473 headers = {'Authorization' : 'Basic %s' %
474 base64.b64encode('username:pass')}
475 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
476 self.assertEqual(('Hello World\n', 'text/html', 200),
477 (res.read(), res.getheader('Content-type'), res.status))
478
479 def test_no_leading_slash(self):
480 # http://bugs.python.org/issue2254
481 res = self.request('cgi-bin/file1.py')
482 self.assertEqual(('Hello World\n', 'text/html', 200),
483 (res.read(), res.getheader('Content-type'), res.status))
484
485 def test_os_environ_is_not_altered(self):
486 signature = "Test CGI Server"
487 os.environ['SERVER_SOFTWARE'] = signature
488 res = self.request('/cgi-bin/file1.py')
489 self.assertEqual((b'Hello World\n', 'text/html', 200),
490 (res.read(), res.getheader('Content-type'), res.status))
491 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
492
493
494 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
495 """ Test url parsing """
496 def setUp(self):
497 self.translated = os.getcwd()
498 self.translated = os.path.join(self.translated, 'filename')
499 self.handler = SocketlessRequestHandler()
500
501 def test_query_arguments(self):
502 path = self.handler.translate_path('/filename')
503 self.assertEqual(path, self.translated)
504 path = self.handler.translate_path('/filename?foo=bar')
505 self.assertEqual(path, self.translated)
506 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
507 self.assertEqual(path, self.translated)
508
509 def test_start_with_double_slash(self):
510 path = self.handler.translate_path('//filename')
511 self.assertEqual(path, self.translated)
512 path = self.handler.translate_path('//filename?foo=bar')
513 self.assertEqual(path, self.translated)
514
515
516 def test_main(verbose=None):
517 try:
518 cwd = os.getcwd()
519 test_support.run_unittest(BaseHTTPRequestHandlerTestCase,
520 SimpleHTTPRequestHandlerTestCase,
521 BaseHTTPServerTestCase,
522 SimpleHTTPServerTestCase,
523 CGIHTTPServerTestCase
524 )
525 finally:
526 os.chdir(cwd)
527
528 if __name__ == '__main__':
529 test_main()