]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_wsgiref.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_wsgiref.py
1 from __future__ import nested_scopes # Backward compat for 2.1
2 from unittest import TestCase
3 from wsgiref.util import setup_testing_defaults
4 from wsgiref.headers import Headers
5 from wsgiref.handlers import BaseHandler, BaseCGIHandler
6 from wsgiref import util
7 from wsgiref.validate import validator
8 from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9 from wsgiref.simple_server import make_server
10 from StringIO import StringIO
11 from SocketServer import BaseServer
12 import os
13 import re
14 import sys
15
16 from test import test_support
17
18 class MockServer(WSGIServer):
19 """Non-socket HTTP server"""
20
21 def __init__(self, server_address, RequestHandlerClass):
22 BaseServer.__init__(self, server_address, RequestHandlerClass)
23 self.server_bind()
24
25 def server_bind(self):
26 host, port = self.server_address
27 self.server_name = host
28 self.server_port = port
29 self.setup_environ()
30
31
32 class MockHandler(WSGIRequestHandler):
33 """Non-socket HTTP handler"""
34 def setup(self):
35 self.connection = self.request
36 self.rfile, self.wfile = self.connection
37
38 def finish(self):
39 pass
40
41
42
43
44
45 def hello_app(environ,start_response):
46 start_response("200 OK", [
47 ('Content-Type','text/plain'),
48 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
49 ])
50 return ["Hello, world!"]
51
52 def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
53 server = make_server("", 80, app, MockServer, MockHandler)
54 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
55 sys.stderr = err
56
57 try:
58 server.finish_request((inp,out), ("127.0.0.1",8888))
59 finally:
60 sys.stderr = olderr
61
62 return out.getvalue(), err.getvalue()
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 def compare_generic_iter(make_it,match):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
88
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
92
93 it = make_it()
94 n = 0
95 for item in match:
96 if not it[n]==item: raise AssertionError
97 n+=1
98 try:
99 it[n]
100 except IndexError:
101 pass
102 else:
103 raise AssertionError("Too many items from __getitem__",it)
104
105 try:
106 iter, StopIteration
107 except NameError:
108 pass
109 else:
110 # Only test iter mode under 2.2+
111 it = make_it()
112 if not iter(it) is it: raise AssertionError
113 for item in match:
114 if not it.next()==item: raise AssertionError
115 try:
116 it.next()
117 except StopIteration:
118 pass
119 else:
120 raise AssertionError("Too many items from .next()",it)
121
122
123
124
125
126
127 class IntegrationTests(TestCase):
128
129 def check_hello(self, out, has_length=True):
130 self.assertEqual(out,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length and "Content-Length: 13\r\n" or "") +
136 "\r\n"
137 "Hello, world!"
138 )
139
140 def test_plain_hello(self):
141 out, err = run_amock()
142 self.check_hello(out)
143
144 def test_validated_hello(self):
145 out, err = run_amock(validator(hello_app))
146 # the middleware doesn't support len(), so content-length isn't there
147 self.check_hello(out, has_length=False)
148
149 def test_simple_validation_error(self):
150 def bad_app(environ,start_response):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out, err = run_amock(validator(bad_app))
154 self.assertTrue(out.endswith(
155 "A server error occurred. Please contact the administrator."
156 ))
157 self.assertEqual(
158 err.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
160 " be of type list: <type 'tuple'>"
161 )
162
163
164
165
166
167
168 class UtilityTests(TestCase):
169
170 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
171 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
172 util.setup_testing_defaults(env)
173 self.assertEqual(util.shift_path_info(env),part)
174 self.assertEqual(env['PATH_INFO'],pi_out)
175 self.assertEqual(env['SCRIPT_NAME'],sn_out)
176 return env
177
178 def checkDefault(self, key, value, alt=None):
179 # Check defaulting when empty
180 env = {}
181 util.setup_testing_defaults(env)
182 if isinstance(value, StringIO):
183 self.assertIsInstance(env[key], StringIO)
184 else:
185 self.assertEqual(env[key], value)
186
187 # Check existing value
188 env = {key:alt}
189 util.setup_testing_defaults(env)
190 self.assertTrue(env[key] is alt)
191
192 def checkCrossDefault(self,key,value,**kw):
193 util.setup_testing_defaults(kw)
194 self.assertEqual(kw[key],value)
195
196 def checkAppURI(self,uri,**kw):
197 util.setup_testing_defaults(kw)
198 self.assertEqual(util.application_uri(kw),uri)
199
200 def checkReqURI(self,uri,query=1,**kw):
201 util.setup_testing_defaults(kw)
202 self.assertEqual(util.request_uri(kw,query),uri)
203
204
205
206
207
208
209 def checkFW(self,text,size,match):
210
211 def make_it(text=text,size=size):
212 return util.FileWrapper(StringIO(text),size)
213
214 compare_generic_iter(make_it,match)
215
216 it = make_it()
217 self.assertFalse(it.filelike.closed)
218
219 for item in it:
220 pass
221
222 self.assertFalse(it.filelike.closed)
223
224 it.close()
225 self.assertTrue(it.filelike.closed)
226
227
228 def testSimpleShifts(self):
229 self.checkShift('','/', '', '/', '')
230 self.checkShift('','/x', 'x', '/x', '')
231 self.checkShift('/','', None, '/', '')
232 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self.checkShift('/a','/x/', 'x', '/a/x', '/')
234
235
236 def testNormalizedShifts(self):
237 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self.checkShift('', '/../y', '..', '', '/y')
239 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self.checkShift('/a/b', '///', '', '/a/b/', '')
245 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self.checkShift('/a/b', '/.', None, '/a/b', '')
248
249
250 def testDefaults(self):
251 for key, value in [
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
257 ('SCRIPT_NAME',''),
258 ('PATH_INFO','/'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
266 ]:
267 self.checkDefault(key,value)
268
269
270 def testCrossDefaults(self):
271 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
272 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
273 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
274 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
275 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
276 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
277 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
278
279
280 def testGuessScheme(self):
281 self.assertEqual(util.guess_scheme({}), "http")
282 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
283 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
284 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
285 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
286
287
288
289
290
291 def testAppURIs(self):
292 self.checkAppURI("http://127.0.0.1/")
293 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
294 self.checkAppURI("http://spam.example.com:2071/",
295 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
296 self.checkAppURI("http://spam.example.com/",
297 SERVER_NAME="spam.example.com")
298 self.checkAppURI("http://127.0.0.1/",
299 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
300 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
301 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
302 HTTP_HOST=None)
303
304 def testReqURIs(self):
305 self.checkReqURI("http://127.0.0.1/")
306 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
307 self.checkReqURI("http://127.0.0.1/spammity/spam",
308 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
309 self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
310 SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
311 self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
312 SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
313 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
314 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
315 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
316 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
317
318 def testFileWrapper(self):
319 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
320
321 def testHopByHop(self):
322 for hop in (
323 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
324 "TE Trailers Transfer-Encoding Upgrade"
325 ).split():
326 for alt in hop, hop.title(), hop.upper(), hop.lower():
327 self.assertTrue(util.is_hop_by_hop(alt))
328
329 # Not comprehensive, just a few random header names
330 for hop in (
331 "Accept Cache-Control Date Pragma Trailer Via Warning"
332 ).split():
333 for alt in hop, hop.title(), hop.upper(), hop.lower():
334 self.assertFalse(util.is_hop_by_hop(alt))
335
336 class HeaderTests(TestCase):
337
338 def testMappingInterface(self):
339 test = [('x','y')]
340 self.assertEqual(len(Headers([])),0)
341 self.assertEqual(len(Headers(test[:])),1)
342 self.assertEqual(Headers(test[:]).keys(), ['x'])
343 self.assertEqual(Headers(test[:]).values(), ['y'])
344 self.assertEqual(Headers(test[:]).items(), test)
345 self.assertFalse(Headers(test).items() is test) # must be copy!
346
347 h=Headers([])
348 del h['foo'] # should not raise an error
349
350 h['Foo'] = 'bar'
351 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
352 self.assertTrue(m('foo'))
353 self.assertTrue(m('Foo'))
354 self.assertTrue(m('FOO'))
355 self.assertFalse(m('bar'))
356
357 self.assertEqual(h['foo'],'bar')
358 h['foo'] = 'baz'
359 self.assertEqual(h['FOO'],'baz')
360 self.assertEqual(h.get_all('foo'),['baz'])
361
362 self.assertEqual(h.get("foo","whee"), "baz")
363 self.assertEqual(h.get("zoo","whee"), "whee")
364 self.assertEqual(h.setdefault("foo","whee"), "baz")
365 self.assertEqual(h.setdefault("zoo","whee"), "whee")
366 self.assertEqual(h["foo"],"baz")
367 self.assertEqual(h["zoo"],"whee")
368
369 def testRequireList(self):
370 self.assertRaises(TypeError, Headers, "foo")
371
372
373 def testExtras(self):
374 h = Headers([])
375 self.assertEqual(str(h),'\r\n')
376
377 h.add_header('foo','bar',baz="spam")
378 self.assertEqual(h['foo'], 'bar; baz="spam"')
379 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
380
381 h.add_header('Foo','bar',cheese=None)
382 self.assertEqual(h.get_all('foo'),
383 ['bar; baz="spam"', 'bar; cheese'])
384
385 self.assertEqual(str(h),
386 'foo: bar; baz="spam"\r\n'
387 'Foo: bar; cheese\r\n'
388 '\r\n'
389 )
390
391
392 class ErrorHandler(BaseCGIHandler):
393 """Simple handler subclass for testing BaseHandler"""
394
395 # BaseHandler records the OS environment at import time, but envvars
396 # might have been changed later by other tests, which trips up
397 # HandlerTests.testEnviron().
398 os_environ = dict(os.environ.items())
399
400 def __init__(self,**kw):
401 setup_testing_defaults(kw)
402 BaseCGIHandler.__init__(
403 self, StringIO(''), StringIO(), StringIO(), kw,
404 multithread=True, multiprocess=True
405 )
406
407 class TestHandler(ErrorHandler):
408 """Simple handler subclass for testing BaseHandler, w/error passthru"""
409
410 def handle_error(self):
411 raise # for testing, we want to see what's happening
412
413
414
415
416
417
418
419
420
421
422
423 class HandlerTests(TestCase):
424
425 def checkEnvironAttrs(self, handler):
426 env = handler.environ
427 for attr in [
428 'version','multithread','multiprocess','run_once','file_wrapper'
429 ]:
430 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
431 continue
432 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
433
434 def checkOSEnviron(self,handler):
435 empty = {}; setup_testing_defaults(empty)
436 env = handler.environ
437 from os import environ
438 for k,v in environ.items():
439 if k not in empty:
440 self.assertEqual(env[k],v)
441 for k,v in empty.items():
442 self.assertIn(k, env)
443
444 def testEnviron(self):
445 h = TestHandler(X="Y")
446 h.setup_environ()
447 self.checkEnvironAttrs(h)
448 self.checkOSEnviron(h)
449 self.assertEqual(h.environ["X"],"Y")
450
451 def testCGIEnviron(self):
452 h = BaseCGIHandler(None,None,None,{})
453 h.setup_environ()
454 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
455 self.assertIn(key, h.environ)
456
457 def testScheme(self):
458 h=TestHandler(HTTPS="on"); h.setup_environ()
459 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
460 h=TestHandler(); h.setup_environ()
461 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
462
463
464 def testAbstractMethods(self):
465 h = BaseHandler()
466 for name in [
467 '_flush','get_stdin','get_stderr','add_cgi_vars'
468 ]:
469 self.assertRaises(NotImplementedError, getattr(h,name))
470 self.assertRaises(NotImplementedError, h._write, "test")
471
472
473 def testContentLength(self):
474 # Demo one reason iteration is better than write()... ;)
475
476 def trivial_app1(e,s):
477 s('200 OK',[])
478 return [e['wsgi.url_scheme']]
479
480 def trivial_app2(e,s):
481 s('200 OK',[])(e['wsgi.url_scheme'])
482 return []
483
484 def trivial_app4(e,s):
485 # Simulate a response to a HEAD request
486 s('200 OK',[('Content-Length', '12345')])
487 return []
488
489 h = TestHandler()
490 h.run(trivial_app1)
491 self.assertEqual(h.stdout.getvalue(),
492 "Status: 200 OK\r\n"
493 "Content-Length: 4\r\n"
494 "\r\n"
495 "http")
496
497 h = TestHandler()
498 h.run(trivial_app2)
499 self.assertEqual(h.stdout.getvalue(),
500 "Status: 200 OK\r\n"
501 "\r\n"
502 "http")
503
504
505 h = TestHandler()
506 h.run(trivial_app4)
507 self.assertEqual(h.stdout.getvalue(),
508 b'Status: 200 OK\r\n'
509 b'Content-Length: 12345\r\n'
510 b'\r\n')
511
512 def testBasicErrorOutput(self):
513
514 def non_error_app(e,s):
515 s('200 OK',[])
516 return []
517
518 def error_app(e,s):
519 raise AssertionError("This should be caught by handler")
520
521 h = ErrorHandler()
522 h.run(non_error_app)
523 self.assertEqual(h.stdout.getvalue(),
524 "Status: 200 OK\r\n"
525 "Content-Length: 0\r\n"
526 "\r\n")
527 self.assertEqual(h.stderr.getvalue(),"")
528
529 h = ErrorHandler()
530 h.run(error_app)
531 self.assertEqual(h.stdout.getvalue(),
532 "Status: %s\r\n"
533 "Content-Type: text/plain\r\n"
534 "Content-Length: %d\r\n"
535 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
536
537 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
538
539 def testErrorAfterOutput(self):
540 MSG = "Some output has been sent"
541 def error_app(e,s):
542 s("200 OK",[])(MSG)
543 raise AssertionError("This should be caught by handler")
544
545 h = ErrorHandler()
546 h.run(error_app)
547 self.assertEqual(h.stdout.getvalue(),
548 "Status: 200 OK\r\n"
549 "\r\n"+MSG)
550 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
551
552
553 def testHeaderFormats(self):
554
555 def non_error_app(e,s):
556 s('200 OK',[])
557 return []
558
559 stdpat = (
560 r"HTTP/%s 200 OK\r\n"
561 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
562 r"%s" r"Content-Length: 0\r\n" r"\r\n"
563 )
564 shortpat = (
565 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
566 )
567
568 for ssw in "FooBar/1.0", None:
569 sw = ssw and "Server: %s\r\n" % ssw or ""
570
571 for version in "1.0", "1.1":
572 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
573
574 h = TestHandler(SERVER_PROTOCOL=proto)
575 h.origin_server = False
576 h.http_version = version
577 h.server_software = ssw
578 h.run(non_error_app)
579 self.assertEqual(shortpat,h.stdout.getvalue())
580
581 h = TestHandler(SERVER_PROTOCOL=proto)
582 h.origin_server = True
583 h.http_version = version
584 h.server_software = ssw
585 h.run(non_error_app)
586 if proto=="HTTP/0.9":
587 self.assertEqual(h.stdout.getvalue(),"")
588 else:
589 self.assertTrue(
590 re.match(stdpat%(version,sw), h.stdout.getvalue()),
591 (stdpat%(version,sw), h.stdout.getvalue())
592 )
593
594 # This epilogue is needed for compatibility with the Python 2.5 regrtest module
595
596 def test_main():
597 test_support.run_unittest(__name__)
598
599 if __name__ == "__main__":
600 test_main()
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630 # the above lines intentionally left blank