]>
git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_wsgiref.py
1 from __future__
import nested_scopes
# Backward compat for 2.1
2 from unittest
import TestCase
3 from wsgiref
.util
import setup_testing_defaults
4 from wsgiref
.headers
import Headers
5 from wsgiref
.handlers
import BaseHandler
, BaseCGIHandler
6 from wsgiref
import util
7 from wsgiref
.validate
import validator
8 from wsgiref
.simple_server
import WSGIServer
, WSGIRequestHandler
, demo_app
9 from wsgiref
.simple_server
import make_server
10 from StringIO
import StringIO
11 from SocketServer
import BaseServer
16 from test
import test_support
18 class MockServer(WSGIServer
):
19 """Non-socket HTTP server"""
21 def __init__(self
, server_address
, RequestHandlerClass
):
22 BaseServer
.__init
__(self
, server_address
, RequestHandlerClass
)
25 def server_bind(self
):
26 host
, port
= self
.server_address
27 self
.server_name
= host
28 self
.server_port
= port
32 class MockHandler(WSGIRequestHandler
):
33 """Non-socket HTTP handler"""
35 self
.connection
= self
.request
36 self
.rfile
, self
.wfile
= self
.connection
45 def hello_app(environ
,start_response
):
46 start_response("200 OK", [
47 ('Content-Type','text/plain'),
48 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
50 return ["Hello, world!"]
52 def run_amock(app
=hello_app
, data
="GET / HTTP/1.0\n\n"):
53 server
= make_server("", 80, app
, MockServer
, MockHandler
)
54 inp
, out
, err
, olderr
= StringIO(data
), StringIO(), StringIO(), sys
.stderr
58 server
.finish_request((inp
,out
), ("127.0.0.1",8888))
62 return out
.getvalue(), err
.getvalue()
86 def compare_generic_iter(make_it
,match
):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
96 if not it
[n
]==item
: raise AssertionError
103 raise AssertionError("Too many items from __getitem__",it
)
110 # Only test iter mode under 2.2+
112 if not iter(it
) is it
: raise AssertionError
114 if not it
.next()==item
: raise AssertionError
117 except StopIteration:
120 raise AssertionError("Too many items from .next()",it
)
127 class IntegrationTests(TestCase
):
129 def check_hello(self
, out
, has_length
=True):
130 self
.assertEqual(out
,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys
.version
.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length
and "Content-Length: 13\r\n" or "") +
140 def test_plain_hello(self
):
141 out
, err
= run_amock()
142 self
.check_hello(out
)
144 def test_validated_hello(self
):
145 out
, err
= run_amock(validator(hello_app
))
146 # the middleware doesn't support len(), so content-length isn't there
147 self
.check_hello(out
, has_length
=False)
149 def test_simple_validation_error(self
):
150 def bad_app(environ
,start_response
):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out
, err
= run_amock(validator(bad_app
))
154 self
.assertTrue(out
.endswith(
155 "A server error occurred. Please contact the administrator."
158 err
.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
160 " be of type list: <type 'tuple'>"
168 class UtilityTests(TestCase
):
170 def checkShift(self
,sn_in
,pi_in
,part
,sn_out
,pi_out
):
171 env
= {'SCRIPT_NAME':sn_in
,'PATH_INFO':pi_in
}
172 util
.setup_testing_defaults(env
)
173 self
.assertEqual(util
.shift_path_info(env
),part
)
174 self
.assertEqual(env
['PATH_INFO'],pi_out
)
175 self
.assertEqual(env
['SCRIPT_NAME'],sn_out
)
178 def checkDefault(self
, key
, value
, alt
=None):
179 # Check defaulting when empty
181 util
.setup_testing_defaults(env
)
182 if isinstance(value
, StringIO
):
183 self
.assertIsInstance(env
[key
], StringIO
)
185 self
.assertEqual(env
[key
], value
)
187 # Check existing value
189 util
.setup_testing_defaults(env
)
190 self
.assertTrue(env
[key
] is alt
)
192 def checkCrossDefault(self
,key
,value
,**kw
):
193 util
.setup_testing_defaults(kw
)
194 self
.assertEqual(kw
[key
],value
)
196 def checkAppURI(self
,uri
,**kw
):
197 util
.setup_testing_defaults(kw
)
198 self
.assertEqual(util
.application_uri(kw
),uri
)
200 def checkReqURI(self
,uri
,query
=1,**kw
):
201 util
.setup_testing_defaults(kw
)
202 self
.assertEqual(util
.request_uri(kw
,query
),uri
)
209 def checkFW(self
,text
,size
,match
):
211 def make_it(text
=text
,size
=size
):
212 return util
.FileWrapper(StringIO(text
),size
)
214 compare_generic_iter(make_it
,match
)
217 self
.assertFalse(it
.filelike
.closed
)
222 self
.assertFalse(it
.filelike
.closed
)
225 self
.assertTrue(it
.filelike
.closed
)
228 def testSimpleShifts(self
):
229 self
.checkShift('','/', '', '/', '')
230 self
.checkShift('','/x', 'x', '/x', '')
231 self
.checkShift('/','', None, '/', '')
232 self
.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self
.checkShift('/a','/x/', 'x', '/a/x', '/')
236 def testNormalizedShifts(self
):
237 self
.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self
.checkShift('', '/../y', '..', '', '/y')
239 self
.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self
.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self
.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self
.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self
.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self
.checkShift('/a/b', '///', '', '/a/b/', '')
245 self
.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self
.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self
.checkShift('/a/b', '/.', None, '/a/b', '')
250 def testDefaults(self
):
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
267 self
.checkDefault(key
,value
)
270 def testCrossDefaults(self
):
271 self
.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME
="foo.bar")
272 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="on")
273 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="1")
274 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="yes")
275 self
.checkCrossDefault('wsgi.url_scheme',"http",HTTPS
="foo")
276 self
.checkCrossDefault('SERVER_PORT',"80",HTTPS
="foo")
277 self
.checkCrossDefault('SERVER_PORT',"443",HTTPS
="on")
280 def testGuessScheme(self
):
281 self
.assertEqual(util
.guess_scheme({}), "http")
282 self
.assertEqual(util
.guess_scheme({'HTTPS':"foo"}), "http")
283 self
.assertEqual(util
.guess_scheme({'HTTPS':"on"}), "https")
284 self
.assertEqual(util
.guess_scheme({'HTTPS':"yes"}), "https")
285 self
.assertEqual(util
.guess_scheme({'HTTPS':"1"}), "https")
291 def testAppURIs(self
):
292 self
.checkAppURI("http://127.0.0.1/")
293 self
.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME
="/spam")
294 self
.checkAppURI("http://spam.example.com:2071/",
295 HTTP_HOST
="spam.example.com:2071", SERVER_PORT
="2071")
296 self
.checkAppURI("http://spam.example.com/",
297 SERVER_NAME
="spam.example.com")
298 self
.checkAppURI("http://127.0.0.1/",
299 HTTP_HOST
="127.0.0.1", SERVER_NAME
="spam.example.com")
300 self
.checkAppURI("https://127.0.0.1/", HTTPS
="on")
301 self
.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT
="8000",
304 def testReqURIs(self
):
305 self
.checkReqURI("http://127.0.0.1/")
306 self
.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME
="/spam")
307 self
.checkReqURI("http://127.0.0.1/spammity/spam",
308 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam")
309 self
.checkReqURI("http://127.0.0.1/spammity/spam;ham",
310 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam;ham")
311 self
.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
312 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam;cookie=1234,5678")
313 self
.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
314 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam",QUERY_STRING
="say=ni")
315 self
.checkReqURI("http://127.0.0.1/spammity/spam", 0,
316 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam",QUERY_STRING
="say=ni")
318 def testFileWrapper(self
):
319 self
.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
321 def testHopByHop(self
):
323 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
324 "TE Trailers Transfer-Encoding Upgrade"
326 for alt
in hop
, hop
.title(), hop
.upper(), hop
.lower():
327 self
.assertTrue(util
.is_hop_by_hop(alt
))
329 # Not comprehensive, just a few random header names
331 "Accept Cache-Control Date Pragma Trailer Via Warning"
333 for alt
in hop
, hop
.title(), hop
.upper(), hop
.lower():
334 self
.assertFalse(util
.is_hop_by_hop(alt
))
336 class HeaderTests(TestCase
):
338 def testMappingInterface(self
):
340 self
.assertEqual(len(Headers([])),0)
341 self
.assertEqual(len(Headers(test
[:])),1)
342 self
.assertEqual(Headers(test
[:]).keys(), ['x'])
343 self
.assertEqual(Headers(test
[:]).values(), ['y'])
344 self
.assertEqual(Headers(test
[:]).items(), test
)
345 self
.assertFalse(Headers(test
).items() is test
) # must be copy!
348 del h
['foo'] # should not raise an error
351 for m
in h
.has_key
, h
.__contains
__, h
.get
, h
.get_all
, h
.__getitem
__:
352 self
.assertTrue(m('foo'))
353 self
.assertTrue(m('Foo'))
354 self
.assertTrue(m('FOO'))
355 self
.assertFalse(m('bar'))
357 self
.assertEqual(h
['foo'],'bar')
359 self
.assertEqual(h
['FOO'],'baz')
360 self
.assertEqual(h
.get_all('foo'),['baz'])
362 self
.assertEqual(h
.get("foo","whee"), "baz")
363 self
.assertEqual(h
.get("zoo","whee"), "whee")
364 self
.assertEqual(h
.setdefault("foo","whee"), "baz")
365 self
.assertEqual(h
.setdefault("zoo","whee"), "whee")
366 self
.assertEqual(h
["foo"],"baz")
367 self
.assertEqual(h
["zoo"],"whee")
369 def testRequireList(self
):
370 self
.assertRaises(TypeError, Headers
, "foo")
373 def testExtras(self
):
375 self
.assertEqual(str(h
),'\r\n')
377 h
.add_header('foo','bar',baz
="spam")
378 self
.assertEqual(h
['foo'], 'bar; baz="spam"')
379 self
.assertEqual(str(h
),'foo: bar; baz="spam"\r\n\r\n')
381 h
.add_header('Foo','bar',cheese
=None)
382 self
.assertEqual(h
.get_all('foo'),
383 ['bar; baz="spam"', 'bar; cheese'])
385 self
.assertEqual(str(h
),
386 'foo: bar; baz="spam"\r\n'
387 'Foo: bar; cheese\r\n'
392 class ErrorHandler(BaseCGIHandler
):
393 """Simple handler subclass for testing BaseHandler"""
395 # BaseHandler records the OS environment at import time, but envvars
396 # might have been changed later by other tests, which trips up
397 # HandlerTests.testEnviron().
398 os_environ
= dict(os
.environ
.items())
400 def __init__(self
,**kw
):
401 setup_testing_defaults(kw
)
402 BaseCGIHandler
.__init
__(
403 self
, StringIO(''), StringIO(), StringIO(), kw
,
404 multithread
=True, multiprocess
=True
407 class TestHandler(ErrorHandler
):
408 """Simple handler subclass for testing BaseHandler, w/error passthru"""
410 def handle_error(self
):
411 raise # for testing, we want to see what's happening
423 class HandlerTests(TestCase
):
425 def checkEnvironAttrs(self
, handler
):
426 env
= handler
.environ
428 'version','multithread','multiprocess','run_once','file_wrapper'
430 if attr
=='file_wrapper' and handler
.wsgi_file_wrapper
is None:
432 self
.assertEqual(getattr(handler
,'wsgi_'+attr
),env
['wsgi.'+attr
])
434 def checkOSEnviron(self
,handler
):
435 empty
= {}; setup_testing_defaults(empty
)
436 env
= handler
.environ
437 from os
import environ
438 for k
,v
in environ
.items():
440 self
.assertEqual(env
[k
],v
)
441 for k
,v
in empty
.items():
442 self
.assertIn(k
, env
)
444 def testEnviron(self
):
445 h
= TestHandler(X
="Y")
447 self
.checkEnvironAttrs(h
)
448 self
.checkOSEnviron(h
)
449 self
.assertEqual(h
.environ
["X"],"Y")
451 def testCGIEnviron(self
):
452 h
= BaseCGIHandler(None,None,None,{})
454 for key
in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
455 self
.assertIn(key
, h
.environ
)
457 def testScheme(self
):
458 h
=TestHandler(HTTPS
="on"); h
.setup_environ()
459 self
.assertEqual(h
.environ
['wsgi.url_scheme'],'https')
460 h
=TestHandler(); h
.setup_environ()
461 self
.assertEqual(h
.environ
['wsgi.url_scheme'],'http')
464 def testAbstractMethods(self
):
467 '_flush','get_stdin','get_stderr','add_cgi_vars'
469 self
.assertRaises(NotImplementedError, getattr(h
,name
))
470 self
.assertRaises(NotImplementedError, h
._write
, "test")
473 def testContentLength(self
):
474 # Demo one reason iteration is better than write()... ;)
476 def trivial_app1(e
,s
):
478 return [e
['wsgi.url_scheme']]
480 def trivial_app2(e
,s
):
481 s('200 OK',[])(e
['wsgi.url_scheme'])
484 def trivial_app4(e
,s
):
485 # Simulate a response to a HEAD request
486 s('200 OK',[('Content-Length', '12345')])
491 self
.assertEqual(h
.stdout
.getvalue(),
493 "Content-Length: 4\r\n"
499 self
.assertEqual(h
.stdout
.getvalue(),
507 self
.assertEqual(h
.stdout
.getvalue(),
508 b
'Status: 200 OK\r\n'
509 b
'Content-Length: 12345\r\n'
512 def testBasicErrorOutput(self
):
514 def non_error_app(e
,s
):
519 raise AssertionError("This should be caught by handler")
523 self
.assertEqual(h
.stdout
.getvalue(),
525 "Content-Length: 0\r\n"
527 self
.assertEqual(h
.stderr
.getvalue(),"")
531 self
.assertEqual(h
.stdout
.getvalue(),
533 "Content-Type: text/plain\r\n"
534 "Content-Length: %d\r\n"
535 "\r\n%s" % (h
.error_status
,len(h
.error_body
),h
.error_body
))
537 self
.assertNotEqual(h
.stderr
.getvalue().find("AssertionError"), -1)
539 def testErrorAfterOutput(self
):
540 MSG
= "Some output has been sent"
543 raise AssertionError("This should be caught by handler")
547 self
.assertEqual(h
.stdout
.getvalue(),
550 self
.assertNotEqual(h
.stderr
.getvalue().find("AssertionError"), -1)
553 def testHeaderFormats(self
):
555 def non_error_app(e
,s
):
560 r
"HTTP/%s 200 OK\r\n"
561 r
"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
562 r
"%s" r
"Content-Length: 0\r\n" r
"\r\n"
565 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
568 for ssw
in "FooBar/1.0", None:
569 sw
= ssw
and "Server: %s\r\n" % ssw
or ""
571 for version
in "1.0", "1.1":
572 for proto
in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
574 h
= TestHandler(SERVER_PROTOCOL
=proto
)
575 h
.origin_server
= False
576 h
.http_version
= version
577 h
.server_software
= ssw
579 self
.assertEqual(shortpat
,h
.stdout
.getvalue())
581 h
= TestHandler(SERVER_PROTOCOL
=proto
)
582 h
.origin_server
= True
583 h
.http_version
= version
584 h
.server_software
= ssw
586 if proto
=="HTTP/0.9":
587 self
.assertEqual(h
.stdout
.getvalue(),"")
590 re
.match(stdpat
%(version
,sw
), h
.stdout
.getvalue()),
591 (stdpat
%(version
,sw
), h
.stdout
.getvalue())
594 # This epilogue is needed for compatibility with the Python 2.5 regrtest module
597 test_support
.run_unittest(__name__
)
599 if __name__
== "__main__":
630 # the above lines intentionally left blank