1 """Unittests for the various HTTPServer modules.
3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
7 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
8 from SimpleHTTPServer
import SimpleHTTPRequestHandler
9 from CGIHTTPServer
import CGIHTTPRequestHandler
23 from StringIO
import StringIO
25 from test
import test_support
26 threading
= test_support
.import_module('threading')
29 class NoLogRequestHandler
:
30 def log_message(self
, *args
):
31 # don't write log messages to stderr
34 class SocketlessRequestHandler(SimpleHTTPRequestHandler
):
36 self
.get_called
= False
37 self
.protocol_version
= "HTTP/1.1"
40 self
.get_called
= True
41 self
.send_response(200)
42 self
.send_header('Content-Type', 'text/html')
44 self
.wfile
.write(b
'<html><body>Data</body></html>\r\n')
46 def log_message(self
, format
, *args
):
50 class TestServerThread(threading
.Thread
):
51 def __init__(self
, test_object
, request_handler
):
52 threading
.Thread
.__init
__(self
)
53 self
.request_handler
= request_handler
54 self
.test_object
= test_object
57 self
.server
= HTTPServer(('', 0), self
.request_handler
)
58 self
.test_object
.PORT
= self
.server
.socket
.getsockname()[1]
59 self
.test_object
.server_started
.set()
60 self
.test_object
= None
62 self
.server
.serve_forever(0.05)
64 self
.server
.server_close()
67 self
.server
.shutdown()
70 class BaseTestCase(unittest
.TestCase
):
72 self
._threads
= test_support
.threading_setup()
73 os
.environ
= test_support
.EnvironmentVarGuard()
74 self
.server_started
= threading
.Event()
75 self
.thread
= TestServerThread(self
, self
.request_handler
)
77 self
.server_started
.wait()
82 test_support
.threading_cleanup(*self
._threads
)
84 def request(self
, uri
, method
='GET', body
=None, headers
={}):
85 self
.connection
= httplib
.HTTPConnection('localhost', self
.PORT
)
86 self
.connection
.request(method
, uri
, body
, headers
)
87 return self
.connection
.getresponse()
89 class BaseHTTPRequestHandlerTestCase(unittest
.TestCase
):
90 """Test the functionality of the BaseHTTPServer focussing on
91 BaseHTTPRequestHandler.
94 HTTPResponseMatch
= re
.compile('HTTP/1.[0-9]+ 200 OK')
97 self
.handler
= SocketlessRequestHandler()
99 def send_typical_request(self
, message
):
100 input = StringIO(message
)
102 self
.handler
.rfile
= input
103 self
.handler
.wfile
= output
104 self
.handler
.handle_one_request()
106 return output
.readlines()
108 def verify_get_called(self
):
109 self
.assertTrue(self
.handler
.get_called
)
111 def verify_expected_headers(self
, headers
):
112 for fieldName
in 'Server: ', 'Date: ', 'Content-Type: ':
113 self
.assertEqual(sum(h
.startswith(fieldName
) for h
in headers
), 1)
115 def verify_http_server_response(self
, response
):
116 match
= self
.HTTPResponseMatch
.search(response
)
117 self
.assertTrue(match
is not None)
119 def test_http_1_1(self
):
120 result
= self
.send_typical_request('GET / HTTP/1.1\r\n\r\n')
121 self
.verify_http_server_response(result
[0])
122 self
.verify_expected_headers(result
[1:-1])
123 self
.verify_get_called()
124 self
.assertEqual(result
[-1], '<html><body>Data</body></html>\r\n')
126 def test_http_1_0(self
):
127 result
= self
.send_typical_request('GET / HTTP/1.0\r\n\r\n')
128 self
.verify_http_server_response(result
[0])
129 self
.verify_expected_headers(result
[1:-1])
130 self
.verify_get_called()
131 self
.assertEqual(result
[-1], '<html><body>Data</body></html>\r\n')
133 def test_http_0_9(self
):
134 result
= self
.send_typical_request('GET / HTTP/0.9\r\n\r\n')
135 self
.assertEqual(len(result
), 1)
136 self
.assertEqual(result
[0], '<html><body>Data</body></html>\r\n')
137 self
.verify_get_called()
139 def test_with_continue_1_0(self
):
140 result
= self
.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
141 self
.verify_http_server_response(result
[0])
142 self
.verify_expected_headers(result
[1:-1])
143 self
.verify_get_called()
144 self
.assertEqual(result
[-1], '<html><body>Data</body></html>\r\n')
146 def test_request_length(self
):
147 # Issue #10714: huge request lines are discarded, to avoid Denial
148 # of Service attacks.
149 result
= self
.send_typical_request(b
'GET ' + b
'x' * 65537)
150 self
.assertEqual(result
[0], b
'HTTP/1.1 414 Request-URI Too Long\r\n')
151 self
.assertFalse(self
.handler
.get_called
)
154 class BaseHTTPServerTestCase(BaseTestCase
):
155 class request_handler(NoLogRequestHandler
, BaseHTTPRequestHandler
):
156 protocol_version
= 'HTTP/1.1'
157 default_request_version
= 'HTTP/1.1'
160 self
.send_response(204)
161 self
.send_header('Content-Type', 'text/html')
162 self
.send_header('Connection', 'close')
166 self
.send_response(204)
167 self
.send_header('Content-Type', 'text/html')
168 self
.send_header('Connection', 'keep-alive')
171 def do_KEYERROR(self
):
175 self
.send_response(999)
176 self
.send_header('Content-Type', 'text/html')
177 self
.send_header('Connection', 'close')
181 BaseTestCase
.setUp(self
)
182 self
.con
= httplib
.HTTPConnection('localhost', self
.PORT
)
185 def test_command(self
):
186 self
.con
.request('GET', '/')
187 res
= self
.con
.getresponse()
188 self
.assertEqual(res
.status
, 501)
190 def test_request_line_trimming(self
):
191 self
.con
._http
_vsn
_str
= 'HTTP/1.1\n'
192 self
.con
.putrequest('GET', '/')
193 self
.con
.endheaders()
194 res
= self
.con
.getresponse()
195 self
.assertEqual(res
.status
, 501)
197 def test_version_bogus(self
):
198 self
.con
._http
_vsn
_str
= 'FUBAR'
199 self
.con
.putrequest('GET', '/')
200 self
.con
.endheaders()
201 res
= self
.con
.getresponse()
202 self
.assertEqual(res
.status
, 400)
204 def test_version_digits(self
):
205 self
.con
._http
_vsn
_str
= 'HTTP/9.9.9'
206 self
.con
.putrequest('GET', '/')
207 self
.con
.endheaders()
208 res
= self
.con
.getresponse()
209 self
.assertEqual(res
.status
, 400)
211 def test_version_none_get(self
):
212 self
.con
._http
_vsn
_str
= ''
213 self
.con
.putrequest('GET', '/')
214 self
.con
.endheaders()
215 res
= self
.con
.getresponse()
216 self
.assertEqual(res
.status
, 501)
218 def test_version_none(self
):
219 self
.con
._http
_vsn
_str
= ''
220 self
.con
.putrequest('PUT', '/')
221 self
.con
.endheaders()
222 res
= self
.con
.getresponse()
223 self
.assertEqual(res
.status
, 400)
225 def test_version_invalid(self
):
226 self
.con
._http
_vsn
= 99
227 self
.con
._http
_vsn
_str
= 'HTTP/9.9'
228 self
.con
.putrequest('GET', '/')
229 self
.con
.endheaders()
230 res
= self
.con
.getresponse()
231 self
.assertEqual(res
.status
, 505)
233 def test_send_blank(self
):
234 self
.con
._http
_vsn
_str
= ''
235 self
.con
.putrequest('', '')
236 self
.con
.endheaders()
237 res
= self
.con
.getresponse()
238 self
.assertEqual(res
.status
, 400)
240 def test_header_close(self
):
241 self
.con
.putrequest('GET', '/')
242 self
.con
.putheader('Connection', 'close')
243 self
.con
.endheaders()
244 res
= self
.con
.getresponse()
245 self
.assertEqual(res
.status
, 501)
247 def test_head_keep_alive(self
):
248 self
.con
._http
_vsn
_str
= 'HTTP/1.1'
249 self
.con
.putrequest('GET', '/')
250 self
.con
.putheader('Connection', 'keep-alive')
251 self
.con
.endheaders()
252 res
= self
.con
.getresponse()
253 self
.assertEqual(res
.status
, 501)
255 def test_handler(self
):
256 self
.con
.request('TEST', '/')
257 res
= self
.con
.getresponse()
258 self
.assertEqual(res
.status
, 204)
260 def test_return_header_keep_alive(self
):
261 self
.con
.request('KEEP', '/')
262 res
= self
.con
.getresponse()
263 self
.assertEqual(res
.getheader('Connection'), 'keep-alive')
264 self
.con
.request('TEST', '/')
265 self
.addCleanup(self
.con
.close
)
267 def test_internal_key_error(self
):
268 self
.con
.request('KEYERROR', '/')
269 res
= self
.con
.getresponse()
270 self
.assertEqual(res
.status
, 999)
272 def test_return_custom_status(self
):
273 self
.con
.request('CUSTOM', '/')
274 res
= self
.con
.getresponse()
275 self
.assertEqual(res
.status
, 999)
278 class SimpleHTTPServerTestCase(BaseTestCase
):
279 class request_handler(NoLogRequestHandler
, SimpleHTTPRequestHandler
):
283 BaseTestCase
.setUp(self
)
284 self
.cwd
= os
.getcwd()
285 basetempdir
= tempfile
.gettempdir()
286 os
.chdir(basetempdir
)
287 self
.data
= 'We are the knights who say Ni!'
288 self
.tempdir
= tempfile
.mkdtemp(dir=basetempdir
)
289 self
.tempdir_name
= os
.path
.basename(self
.tempdir
)
290 temp
= open(os
.path
.join(self
.tempdir
, 'test'), 'wb')
291 temp
.write(self
.data
)
298 shutil
.rmtree(self
.tempdir
)
302 BaseTestCase
.tearDown(self
)
304 def check_status_and_reason(self
, response
, status
, data
=None):
305 body
= response
.read()
306 self
.assertTrue(response
)
307 self
.assertEqual(response
.status
, status
)
308 self
.assertIsNotNone(response
.reason
)
310 self
.assertEqual(data
, body
)
313 #constructs the path relative to the root directory of the HTTPServer
314 response
= self
.request(self
.tempdir_name
+ '/test')
315 self
.check_status_and_reason(response
, 200, data
=self
.data
)
316 response
= self
.request(self
.tempdir_name
+ '/')
317 self
.check_status_and_reason(response
, 200)
318 response
= self
.request(self
.tempdir_name
)
319 self
.check_status_and_reason(response
, 301)
320 response
= self
.request('/ThisDoesNotExist')
321 self
.check_status_and_reason(response
, 404)
322 response
= self
.request('/' + 'ThisDoesNotExist' + '/')
323 self
.check_status_and_reason(response
, 404)
324 f
= open(os
.path
.join(self
.tempdir_name
, 'index.html'), 'w')
325 response
= self
.request('/' + self
.tempdir_name
+ '/')
326 self
.check_status_and_reason(response
, 200)
327 if os
.name
== 'posix':
328 # chmod won't work as expected on Windows platforms
329 os
.chmod(self
.tempdir
, 0)
330 response
= self
.request(self
.tempdir_name
+ '/')
331 self
.check_status_and_reason(response
, 404)
332 os
.chmod(self
.tempdir
, 0755)
335 response
= self
.request(
336 self
.tempdir_name
+ '/test', method
='HEAD')
337 self
.check_status_and_reason(response
, 200)
338 self
.assertEqual(response
.getheader('content-length'),
340 self
.assertEqual(response
.getheader('content-type'),
341 'application/octet-stream')
343 def test_invalid_requests(self
):
344 response
= self
.request('/', method
='FOO')
345 self
.check_status_and_reason(response
, 501)
346 # requests must be case sensitive,so this should fail too
347 response
= self
.request('/', method
='get')
348 self
.check_status_and_reason(response
, 501)
349 response
= self
.request('/', method
='GETs')
350 self
.check_status_and_reason(response
, 501)
356 print "Content-type: text/html"
365 print "Content-type: text/html"
368 form = cgi.FieldStorage()
369 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
370 form.getfirst("bacon"))
373 class CGIHTTPServerTestCase(BaseTestCase
):
374 class request_handler(NoLogRequestHandler
, CGIHTTPRequestHandler
):
378 BaseTestCase
.setUp(self
)
379 self
.parent_dir
= tempfile
.mkdtemp()
380 self
.cgi_dir
= os
.path
.join(self
.parent_dir
, 'cgi-bin')
381 os
.mkdir(self
.cgi_dir
)
383 # The shebang line should be pure ASCII: use symlink if possible.
385 if hasattr(os
, 'symlink'):
386 self
.pythonexe
= os
.path
.join(self
.parent_dir
, 'python')
387 os
.symlink(sys
.executable
, self
.pythonexe
)
389 self
.pythonexe
= sys
.executable
391 self
.file1_path
= os
.path
.join(self
.cgi_dir
, 'file1.py')
392 with
open(self
.file1_path
, 'w') as file1
:
393 file1
.write(cgi_file1
% self
.pythonexe
)
394 os
.chmod(self
.file1_path
, 0777)
396 self
.file2_path
= os
.path
.join(self
.cgi_dir
, 'file2.py')
397 with
open(self
.file2_path
, 'w') as file2
:
398 file2
.write(cgi_file2
% self
.pythonexe
)
399 os
.chmod(self
.file2_path
, 0777)
401 self
.cwd
= os
.getcwd()
402 os
.chdir(self
.parent_dir
)
407 if self
.pythonexe
!= sys
.executable
:
408 os
.remove(self
.pythonexe
)
409 os
.remove(self
.file1_path
)
410 os
.remove(self
.file2_path
)
411 os
.rmdir(self
.cgi_dir
)
412 os
.rmdir(self
.parent_dir
)
414 BaseTestCase
.tearDown(self
)
416 def test_url_collapse_path_split(self
):
420 '/.//..': IndexError,
425 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
426 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
431 './C:/': ('/C:', ''),
433 '/a/b/': ('/a/b', ''),
434 '/a/b/c/..': ('/a/b', ''),
435 '/a/b/c/../d': ('/a/b', 'd'),
436 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'),
437 '/a/b/c/../d/e/../../f': ('/a/b', 'f'),
438 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),
439 '../a/b/c/../d/e/.././././..//f': IndexError,
440 '/a/b/c/../d/e/../../../f': ('/a', 'f'),
441 '/a/b/c/../d/e/../../../../f': ('/', 'f'),
442 '/a/b/c/../d/e/../../../../../f': IndexError,
443 '/a/b/c/../d/e/../../../../f/..': ('/', ''),
445 for path
, expected
in test_vectors
.iteritems():
446 if isinstance(expected
, type) and issubclass(expected
, Exception):
447 self
.assertRaises(expected
,
448 CGIHTTPServer
._url
_collapse
_path
_split
, path
)
450 actual
= CGIHTTPServer
._url
_collapse
_path
_split
(path
)
451 self
.assertEqual(expected
, actual
,
452 msg
='path = %r\nGot: %r\nWanted: %r' %
453 (path
, actual
, expected
))
455 def test_headers_and_content(self
):
456 res
= self
.request('/cgi-bin/file1.py')
457 self
.assertEqual(('Hello World\n', 'text/html', 200),
458 (res
.read(), res
.getheader('Content-type'), res
.status
))
461 params
= urllib
.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
462 headers
= {'Content-type' : 'application/x-www-form-urlencoded'}
463 res
= self
.request('/cgi-bin/file2.py', 'POST', params
, headers
)
465 self
.assertEqual(res
.read(), '1, python, 123456\n')
467 def test_invaliduri(self
):
468 res
= self
.request('/cgi-bin/invalid')
470 self
.assertEqual(res
.status
, 404)
472 def test_authorization(self
):
473 headers
= {'Authorization' : 'Basic %s' %
474 base64
.b64encode('username:pass')}
475 res
= self
.request('/cgi-bin/file1.py', 'GET', headers
=headers
)
476 self
.assertEqual(('Hello World\n', 'text/html', 200),
477 (res
.read(), res
.getheader('Content-type'), res
.status
))
479 def test_no_leading_slash(self
):
480 # http://bugs.python.org/issue2254
481 res
= self
.request('cgi-bin/file1.py')
482 self
.assertEqual(('Hello World\n', 'text/html', 200),
483 (res
.read(), res
.getheader('Content-type'), res
.status
))
485 def test_os_environ_is_not_altered(self
):
486 signature
= "Test CGI Server"
487 os
.environ
['SERVER_SOFTWARE'] = signature
488 res
= self
.request('/cgi-bin/file1.py')
489 self
.assertEqual((b
'Hello World\n', 'text/html', 200),
490 (res
.read(), res
.getheader('Content-type'), res
.status
))
491 self
.assertEqual(os
.environ
['SERVER_SOFTWARE'], signature
)
494 class SimpleHTTPRequestHandlerTestCase(unittest
.TestCase
):
495 """ Test url parsing """
497 self
.translated
= os
.getcwd()
498 self
.translated
= os
.path
.join(self
.translated
, 'filename')
499 self
.handler
= SocketlessRequestHandler()
501 def test_query_arguments(self
):
502 path
= self
.handler
.translate_path('/filename')
503 self
.assertEqual(path
, self
.translated
)
504 path
= self
.handler
.translate_path('/filename?foo=bar')
505 self
.assertEqual(path
, self
.translated
)
506 path
= self
.handler
.translate_path('/filename?a=b&spam=eggs#zot')
507 self
.assertEqual(path
, self
.translated
)
509 def test_start_with_double_slash(self
):
510 path
= self
.handler
.translate_path('//filename')
511 self
.assertEqual(path
, self
.translated
)
512 path
= self
.handler
.translate_path('//filename?foo=bar')
513 self
.assertEqual(path
, self
.translated
)
516 def test_main(verbose
=None):
519 test_support
.run_unittest(BaseHTTPRequestHandlerTestCase
,
520 SimpleHTTPRequestHandlerTestCase
,
521 BaseHTTPServerTestCase
,
522 SimpleHTTPServerTestCase
,
523 CGIHTTPServerTestCase
528 if __name__
== '__main__':