]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_ssl.py
1 # Test the support for SSL and sockets
2
3 import sys
4 import unittest
5 from test import test_support
6 import asyncore
7 import socket
8 import select
9 import time
10 import gc
11 import os
12 import errno
13 import pprint
14 import urllib, urlparse
15 import traceback
16 import weakref
17 import functools
18 import platform
19
20 from BaseHTTPServer import HTTPServer
21 from SimpleHTTPServer import SimpleHTTPRequestHandler
22
23 ssl = test_support.import_module("ssl")
24
25 HOST = test_support.HOST
26 CERTFILE = None
27 SVN_PYTHON_ORG_ROOT_CERT = None
28
29 def handle_error(prefix):
30 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
31 if test_support.verbose:
32 sys.stdout.write(prefix + exc_format)
33
34
35 class BasicTests(unittest.TestCase):
36
37 def test_sslwrap_simple(self):
38 # A crude test for the legacy API
39 try:
40 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
41 except IOError, e:
42 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
43 pass
44 else:
45 raise
46 try:
47 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
48 except IOError, e:
49 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
50 pass
51 else:
52 raise
53
54 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
55 def skip_if_broken_ubuntu_ssl(func):
56 if hasattr(ssl, 'PROTOCOL_SSLv2'):
57 # We need to access the lower-level wrapper in order to create an
58 # implicit SSL context without trying to connect or listen.
59 try:
60 import _ssl
61 except ImportError:
62 # The returned function won't get executed, just ignore the error
63 pass
64 @functools.wraps(func)
65 def f(*args, **kwargs):
66 try:
67 s = socket.socket(socket.AF_INET)
68 _ssl.sslwrap(s._sock, 0, None, None,
69 ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None)
70 except ssl.SSLError as e:
71 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
72 platform.linux_distribution() == ('debian', 'squeeze/sid', '')
73 and 'Invalid SSL protocol variant specified' in str(e)):
74 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
75 return func(*args, **kwargs)
76 return f
77 else:
78 return func
79
80
81 class BasicSocketTests(unittest.TestCase):
82
83 def test_constants(self):
84 #ssl.PROTOCOL_SSLv2
85 ssl.PROTOCOL_SSLv23
86 ssl.PROTOCOL_SSLv3
87 ssl.PROTOCOL_TLSv1
88 ssl.CERT_NONE
89 ssl.CERT_OPTIONAL
90 ssl.CERT_REQUIRED
91
92 def test_random(self):
93 v = ssl.RAND_status()
94 if test_support.verbose:
95 sys.stdout.write("\n RAND_status is %d (%s)\n"
96 % (v, (v and "sufficient randomness") or
97 "insufficient randomness"))
98 try:
99 ssl.RAND_egd(1)
100 except TypeError:
101 pass
102 else:
103 print "didn't raise TypeError"
104 ssl.RAND_add("this is a random string", 75.0)
105
106 def test_parse_cert(self):
107 # note that this uses an 'unofficial' function in _ssl.c,
108 # provided solely for this test, to exercise the certificate
109 # parsing code
110 p = ssl._ssl._test_decode_cert(CERTFILE, False)
111 if test_support.verbose:
112 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
113
114 def test_DER_to_PEM(self):
115 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
116 pem = f.read()
117 d1 = ssl.PEM_cert_to_DER_cert(pem)
118 p2 = ssl.DER_cert_to_PEM_cert(d1)
119 d2 = ssl.PEM_cert_to_DER_cert(p2)
120 self.assertEqual(d1, d2)
121 if not p2.startswith(ssl.PEM_HEADER + '\n'):
122 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
123 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
124 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
125
126 def test_openssl_version(self):
127 n = ssl.OPENSSL_VERSION_NUMBER
128 t = ssl.OPENSSL_VERSION_INFO
129 s = ssl.OPENSSL_VERSION
130 self.assertIsInstance(n, (int, long))
131 self.assertIsInstance(t, tuple)
132 self.assertIsInstance(s, str)
133 # Some sanity checks follow
134 # >= 0.9
135 self.assertGreaterEqual(n, 0x900000)
136 # < 2.0
137 self.assertLess(n, 0x20000000)
138 major, minor, fix, patch, status = t
139 self.assertGreaterEqual(major, 0)
140 self.assertLess(major, 2)
141 self.assertGreaterEqual(minor, 0)
142 self.assertLess(minor, 256)
143 self.assertGreaterEqual(fix, 0)
144 self.assertLess(fix, 256)
145 self.assertGreaterEqual(patch, 0)
146 self.assertLessEqual(patch, 26)
147 self.assertGreaterEqual(status, 0)
148 self.assertLessEqual(status, 15)
149 # Version string as returned by OpenSSL, the format might change
150 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
151 (s, t))
152
153 def test_ciphers(self):
154 if not test_support.is_resource_enabled('network'):
155 return
156 remote = ("svn.python.org", 443)
157 with test_support.transient_internet(remote[0]):
158 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
159 cert_reqs=ssl.CERT_NONE, ciphers="ALL")
160 s.connect(remote)
161 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
162 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
163 s.connect(remote)
164 # Error checking occurs when connecting, because the SSL context
165 # isn't created before.
166 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
167 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
168 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
169 s.connect(remote)
170
171 @test_support.cpython_only
172 def test_refcycle(self):
173 # Issue #7943: an SSL object doesn't create reference cycles with
174 # itself.
175 s = socket.socket(socket.AF_INET)
176 ss = ssl.wrap_socket(s)
177 wr = weakref.ref(ss)
178 del ss
179 self.assertEqual(wr(), None)
180
181 def test_wrapped_unconnected(self):
182 # The _delegate_methods in socket.py are correctly delegated to by an
183 # unconnected SSLSocket, so they will raise a socket.error rather than
184 # something unexpected like TypeError.
185 s = socket.socket(socket.AF_INET)
186 ss = ssl.wrap_socket(s)
187 self.assertRaises(socket.error, ss.recv, 1)
188 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
189 self.assertRaises(socket.error, ss.recvfrom, 1)
190 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
191 self.assertRaises(socket.error, ss.send, b'x')
192 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
193
194
195 class NetworkedTests(unittest.TestCase):
196
197 def test_connect(self):
198 with test_support.transient_internet("svn.python.org"):
199 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
200 cert_reqs=ssl.CERT_NONE)
201 s.connect(("svn.python.org", 443))
202 c = s.getpeercert()
203 if c:
204 self.fail("Peer cert %s shouldn't be here!")
205 s.close()
206
207 # this should fail because we have no verification certs
208 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
209 cert_reqs=ssl.CERT_REQUIRED)
210 try:
211 s.connect(("svn.python.org", 443))
212 except ssl.SSLError:
213 pass
214 finally:
215 s.close()
216
217 # this should succeed because we specify the root cert
218 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
219 cert_reqs=ssl.CERT_REQUIRED,
220 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
221 try:
222 s.connect(("svn.python.org", 443))
223 finally:
224 s.close()
225
226 def test_connect_ex(self):
227 # Issue #11326: check connect_ex() implementation
228 with test_support.transient_internet("svn.python.org"):
229 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
230 cert_reqs=ssl.CERT_REQUIRED,
231 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
232 try:
233 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
234 self.assertTrue(s.getpeercert())
235 finally:
236 s.close()
237
238 def test_non_blocking_connect_ex(self):
239 # Issue #11326: non-blocking connect_ex() should allow handshake
240 # to proceed after the socket gets ready.
241 with test_support.transient_internet("svn.python.org"):
242 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
243 cert_reqs=ssl.CERT_REQUIRED,
244 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
245 do_handshake_on_connect=False)
246 try:
247 s.setblocking(False)
248 rc = s.connect_ex(('svn.python.org', 443))
249 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
250 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
251 # Wait for connect to finish
252 select.select([], [s], [], 5.0)
253 # Non-blocking handshake
254 while True:
255 try:
256 s.do_handshake()
257 break
258 except ssl.SSLError as err:
259 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
260 select.select([s], [], [], 5.0)
261 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
262 select.select([], [s], [], 5.0)
263 else:
264 raise
265 # SSL established
266 self.assertTrue(s.getpeercert())
267 finally:
268 s.close()
269
270 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
271 def test_makefile_close(self):
272 # Issue #5238: creating a file-like object with makefile() shouldn't
273 # delay closing the underlying "real socket" (here tested with its
274 # file descriptor, hence skipping the test under Windows).
275 with test_support.transient_internet("svn.python.org"):
276 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
277 ss.connect(("svn.python.org", 443))
278 fd = ss.fileno()
279 f = ss.makefile()
280 f.close()
281 # The fd is still open
282 os.read(fd, 0)
283 # Closing the SSL socket should close the fd too
284 ss.close()
285 gc.collect()
286 with self.assertRaises(OSError) as e:
287 os.read(fd, 0)
288 self.assertEqual(e.exception.errno, errno.EBADF)
289
290 def test_non_blocking_handshake(self):
291 with test_support.transient_internet("svn.python.org"):
292 s = socket.socket(socket.AF_INET)
293 s.connect(("svn.python.org", 443))
294 s.setblocking(False)
295 s = ssl.wrap_socket(s,
296 cert_reqs=ssl.CERT_NONE,
297 do_handshake_on_connect=False)
298 count = 0
299 while True:
300 try:
301 count += 1
302 s.do_handshake()
303 break
304 except ssl.SSLError, err:
305 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
306 select.select([s], [], [])
307 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
308 select.select([], [s], [])
309 else:
310 raise
311 s.close()
312 if test_support.verbose:
313 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
314
315 def test_get_server_certificate(self):
316 with test_support.transient_internet("svn.python.org"):
317 pem = ssl.get_server_certificate(("svn.python.org", 443))
318 if not pem:
319 self.fail("No server certificate on svn.python.org:443!")
320
321 try:
322 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
323 except ssl.SSLError:
324 #should fail
325 pass
326 else:
327 self.fail("Got server certificate %s for svn.python.org!" % pem)
328
329 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
330 if not pem:
331 self.fail("No server certificate on svn.python.org:443!")
332 if test_support.verbose:
333 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
334
335 def test_algorithms(self):
336 # Issue #8484: all algorithms should be available when verifying a
337 # certificate.
338 # SHA256 was added in OpenSSL 0.9.8
339 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
340 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
341 # NOTE: https://sha256.tbs-internet.com is another possible test host
342 remote = ("sha256.tbs-internet.com", 443)
343 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
344 with test_support.transient_internet("sha256.tbs-internet.com"):
345 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
346 cert_reqs=ssl.CERT_REQUIRED,
347 ca_certs=sha256_cert,)
348 try:
349 s.connect(remote)
350 if test_support.verbose:
351 sys.stdout.write("\nCipher with %r is %r\n" %
352 (remote, s.cipher()))
353 sys.stdout.write("Certificate is:\n%s\n" %
354 pprint.pformat(s.getpeercert()))
355 finally:
356 s.close()
357
358
359 try:
360 import threading
361 except ImportError:
362 _have_threads = False
363 else:
364 _have_threads = True
365
366 class ThreadedEchoServer(threading.Thread):
367
368 class ConnectionHandler(threading.Thread):
369
370 """A mildly complicated class, because we want it to work both
371 with and without the SSL wrapper around the socket connection, so
372 that we can test the STARTTLS functionality."""
373
374 def __init__(self, server, connsock):
375 self.server = server
376 self.running = False
377 self.sock = connsock
378 self.sock.setblocking(1)
379 self.sslconn = None
380 threading.Thread.__init__(self)
381 self.daemon = True
382
383 def show_conn_details(self):
384 if self.server.certreqs == ssl.CERT_REQUIRED:
385 cert = self.sslconn.getpeercert()
386 if test_support.verbose and self.server.chatty:
387 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
388 cert_binary = self.sslconn.getpeercert(True)
389 if test_support.verbose and self.server.chatty:
390 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
391 cipher = self.sslconn.cipher()
392 if test_support.verbose and self.server.chatty:
393 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
394
395 def wrap_conn(self):
396 try:
397 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
398 certfile=self.server.certificate,
399 ssl_version=self.server.protocol,
400 ca_certs=self.server.cacerts,
401 cert_reqs=self.server.certreqs,
402 ciphers=self.server.ciphers)
403 except ssl.SSLError:
404 # XXX Various errors can have happened here, for example
405 # a mismatching protocol version, an invalid certificate,
406 # or a low-level bug. This should be made more discriminating.
407 if self.server.chatty:
408 handle_error("\n server: bad connection attempt from " +
409 str(self.sock.getpeername()) + ":\n")
410 self.close()
411 self.running = False
412 self.server.stop()
413 return False
414 else:
415 return True
416
417 def read(self):
418 if self.sslconn:
419 return self.sslconn.read()
420 else:
421 return self.sock.recv(1024)
422
423 def write(self, bytes):
424 if self.sslconn:
425 return self.sslconn.write(bytes)
426 else:
427 return self.sock.send(bytes)
428
429 def close(self):
430 if self.sslconn:
431 self.sslconn.close()
432 else:
433 self.sock._sock.close()
434
435 def run(self):
436 self.running = True
437 if not self.server.starttls_server:
438 if isinstance(self.sock, ssl.SSLSocket):
439 self.sslconn = self.sock
440 elif not self.wrap_conn():
441 return
442 self.show_conn_details()
443 while self.running:
444 try:
445 msg = self.read()
446 if not msg:
447 # eof, so quit this handler
448 self.running = False
449 self.close()
450 elif msg.strip() == 'over':
451 if test_support.verbose and self.server.connectionchatty:
452 sys.stdout.write(" server: client closed connection\n")
453 self.close()
454 return
455 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
456 if test_support.verbose and self.server.connectionchatty:
457 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
458 self.write("OK\n")
459 if not self.wrap_conn():
460 return
461 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
462 if test_support.verbose and self.server.connectionchatty:
463 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
464 self.write("OK\n")
465 self.sslconn.unwrap()
466 self.sslconn = None
467 if test_support.verbose and self.server.connectionchatty:
468 sys.stdout.write(" server: connection is now unencrypted...\n")
469 else:
470 if (test_support.verbose and
471 self.server.connectionchatty):
472 ctype = (self.sslconn and "encrypted") or "unencrypted"
473 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
474 % (repr(msg), ctype, repr(msg.lower()), ctype))
475 self.write(msg.lower())
476 except ssl.SSLError:
477 if self.server.chatty:
478 handle_error("Test server failure:\n")
479 self.close()
480 self.running = False
481 # normally, we'd just stop here, but for the test
482 # harness, we want to stop the server
483 self.server.stop()
484
485 def __init__(self, certificate, ssl_version=None,
486 certreqs=None, cacerts=None,
487 chatty=True, connectionchatty=False, starttls_server=False,
488 wrap_accepting_socket=False, ciphers=None):
489
490 if ssl_version is None:
491 ssl_version = ssl.PROTOCOL_TLSv1
492 if certreqs is None:
493 certreqs = ssl.CERT_NONE
494 self.certificate = certificate
495 self.protocol = ssl_version
496 self.certreqs = certreqs
497 self.cacerts = cacerts
498 self.ciphers = ciphers
499 self.chatty = chatty
500 self.connectionchatty = connectionchatty
501 self.starttls_server = starttls_server
502 self.sock = socket.socket()
503 self.flag = None
504 if wrap_accepting_socket:
505 self.sock = ssl.wrap_socket(self.sock, server_side=True,
506 certfile=self.certificate,
507 cert_reqs = self.certreqs,
508 ca_certs = self.cacerts,
509 ssl_version = self.protocol,
510 ciphers = self.ciphers)
511 if test_support.verbose and self.chatty:
512 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
513 self.port = test_support.bind_port(self.sock)
514 self.active = False
515 threading.Thread.__init__(self)
516 self.daemon = True
517
518 def start(self, flag=None):
519 self.flag = flag
520 threading.Thread.start(self)
521
522 def run(self):
523 self.sock.settimeout(0.05)
524 self.sock.listen(5)
525 self.active = True
526 if self.flag:
527 # signal an event
528 self.flag.set()
529 while self.active:
530 try:
531 newconn, connaddr = self.sock.accept()
532 if test_support.verbose and self.chatty:
533 sys.stdout.write(' server: new connection from '
534 + str(connaddr) + '\n')
535 handler = self.ConnectionHandler(self, newconn)
536 handler.start()
537 except socket.timeout:
538 pass
539 except KeyboardInterrupt:
540 self.stop()
541 self.sock.close()
542
543 def stop(self):
544 self.active = False
545
546 class AsyncoreEchoServer(threading.Thread):
547
548 class EchoServer(asyncore.dispatcher):
549
550 class ConnectionHandler(asyncore.dispatcher_with_send):
551
552 def __init__(self, conn, certfile):
553 asyncore.dispatcher_with_send.__init__(self, conn)
554 self.socket = ssl.wrap_socket(conn, server_side=True,
555 certfile=certfile,
556 do_handshake_on_connect=False)
557 self._ssl_accepting = True
558
559 def readable(self):
560 if isinstance(self.socket, ssl.SSLSocket):
561 while self.socket.pending() > 0:
562 self.handle_read_event()
563 return True
564
565 def _do_ssl_handshake(self):
566 try:
567 self.socket.do_handshake()
568 except ssl.SSLError, err:
569 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
570 ssl.SSL_ERROR_WANT_WRITE):
571 return
572 elif err.args[0] == ssl.SSL_ERROR_EOF:
573 return self.handle_close()
574 raise
575 except socket.error, err:
576 if err.args[0] == errno.ECONNABORTED:
577 return self.handle_close()
578 else:
579 self._ssl_accepting = False
580
581 def handle_read(self):
582 if self._ssl_accepting:
583 self._do_ssl_handshake()
584 else:
585 data = self.recv(1024)
586 if data and data.strip() != 'over':
587 self.send(data.lower())
588
589 def handle_close(self):
590 self.close()
591 if test_support.verbose:
592 sys.stdout.write(" server: closed connection %s\n" % self.socket)
593
594 def handle_error(self):
595 raise
596
597 def __init__(self, certfile):
598 self.certfile = certfile
599 asyncore.dispatcher.__init__(self)
600 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
601 self.port = test_support.bind_port(self.socket)
602 self.listen(5)
603
604 def handle_accept(self):
605 sock_obj, addr = self.accept()
606 if test_support.verbose:
607 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
608 self.ConnectionHandler(sock_obj, self.certfile)
609
610 def handle_error(self):
611 raise
612
613 def __init__(self, certfile):
614 self.flag = None
615 self.active = False
616 self.server = self.EchoServer(certfile)
617 self.port = self.server.port
618 threading.Thread.__init__(self)
619 self.daemon = True
620
621 def __str__(self):
622 return "<%s %s>" % (self.__class__.__name__, self.server)
623
624 def start(self, flag=None):
625 self.flag = flag
626 threading.Thread.start(self)
627
628 def run(self):
629 self.active = True
630 if self.flag:
631 self.flag.set()
632 while self.active:
633 asyncore.loop(0.05)
634
635 def stop(self):
636 self.active = False
637 self.server.close()
638
639 class SocketServerHTTPSServer(threading.Thread):
640
641 class HTTPSServer(HTTPServer):
642
643 def __init__(self, server_address, RequestHandlerClass, certfile):
644 HTTPServer.__init__(self, server_address, RequestHandlerClass)
645 # we assume the certfile contains both private key and certificate
646 self.certfile = certfile
647 self.allow_reuse_address = True
648
649 def __str__(self):
650 return ('<%s %s:%s>' %
651 (self.__class__.__name__,
652 self.server_name,
653 self.server_port))
654
655 def get_request(self):
656 # override this to wrap socket with SSL
657 sock, addr = self.socket.accept()
658 sslconn = ssl.wrap_socket(sock, server_side=True,
659 certfile=self.certfile)
660 return sslconn, addr
661
662 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
663 # need to override translate_path to get a known root,
664 # instead of using os.curdir, since the test could be
665 # run from anywhere
666
667 server_version = "TestHTTPS/1.0"
668
669 root = None
670
671 def translate_path(self, path):
672 """Translate a /-separated PATH to the local filename syntax.
673
674 Components that mean special things to the local file system
675 (e.g. drive or directory names) are ignored. (XXX They should
676 probably be diagnosed.)
677
678 """
679 # abandon query parameters
680 path = urlparse.urlparse(path)[2]
681 path = os.path.normpath(urllib.unquote(path))
682 words = path.split('/')
683 words = filter(None, words)
684 path = self.root
685 for word in words:
686 drive, word = os.path.splitdrive(word)
687 head, word = os.path.split(word)
688 if word in self.root: continue
689 path = os.path.join(path, word)
690 return path
691
692 def log_message(self, format, *args):
693
694 # we override this to suppress logging unless "verbose"
695
696 if test_support.verbose:
697 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
698 (self.server.server_address,
699 self.server.server_port,
700 self.request.cipher(),
701 self.log_date_time_string(),
702 format%args))
703
704
705 def __init__(self, certfile):
706 self.flag = None
707 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
708 self.server = self.HTTPSServer(
709 (HOST, 0), self.RootedHTTPRequestHandler, certfile)
710 self.port = self.server.server_port
711 threading.Thread.__init__(self)
712 self.daemon = True
713
714 def __str__(self):
715 return "<%s %s>" % (self.__class__.__name__, self.server)
716
717 def start(self, flag=None):
718 self.flag = flag
719 threading.Thread.start(self)
720
721 def run(self):
722 if self.flag:
723 self.flag.set()
724 self.server.serve_forever(0.05)
725
726 def stop(self):
727 self.server.shutdown()
728
729
730 def bad_cert_test(certfile):
731 """
732 Launch a server with CERT_REQUIRED, and check that trying to
733 connect to it with the given client certificate fails.
734 """
735 server = ThreadedEchoServer(CERTFILE,
736 certreqs=ssl.CERT_REQUIRED,
737 cacerts=CERTFILE, chatty=False)
738 flag = threading.Event()
739 server.start(flag)
740 # wait for it to start
741 flag.wait()
742 # try to connect
743 try:
744 try:
745 s = ssl.wrap_socket(socket.socket(),
746 certfile=certfile,
747 ssl_version=ssl.PROTOCOL_TLSv1)
748 s.connect((HOST, server.port))
749 except ssl.SSLError, x:
750 if test_support.verbose:
751 sys.stdout.write("\nSSLError is %s\n" % x[1])
752 except socket.error, x:
753 if test_support.verbose:
754 sys.stdout.write("\nsocket.error is %s\n" % x[1])
755 else:
756 raise AssertionError("Use of invalid cert should have failed!")
757 finally:
758 server.stop()
759 server.join()
760
761 def server_params_test(certfile, protocol, certreqs, cacertsfile,
762 client_certfile, client_protocol=None, indata="FOO\n",
763 ciphers=None, chatty=True, connectionchatty=False,
764 wrap_accepting_socket=False):
765 """
766 Launch a server, connect a client to it and try various reads
767 and writes.
768 """
769 server = ThreadedEchoServer(certfile,
770 certreqs=certreqs,
771 ssl_version=protocol,
772 cacerts=cacertsfile,
773 ciphers=ciphers,
774 chatty=chatty,
775 connectionchatty=connectionchatty,
776 wrap_accepting_socket=wrap_accepting_socket)
777 flag = threading.Event()
778 server.start(flag)
779 # wait for it to start
780 flag.wait()
781 # try to connect
782 if client_protocol is None:
783 client_protocol = protocol
784 try:
785 s = ssl.wrap_socket(socket.socket(),
786 certfile=client_certfile,
787 ca_certs=cacertsfile,
788 ciphers=ciphers,
789 cert_reqs=certreqs,
790 ssl_version=client_protocol)
791 s.connect((HOST, server.port))
792 for arg in [indata, bytearray(indata), memoryview(indata)]:
793 if connectionchatty:
794 if test_support.verbose:
795 sys.stdout.write(
796 " client: sending %s...\n" % (repr(arg)))
797 s.write(arg)
798 outdata = s.read()
799 if connectionchatty:
800 if test_support.verbose:
801 sys.stdout.write(" client: read %s\n" % repr(outdata))
802 if outdata != indata.lower():
803 raise AssertionError(
804 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
805 % (outdata[:min(len(outdata),20)], len(outdata),
806 indata[:min(len(indata),20)].lower(), len(indata)))
807 s.write("over\n")
808 if connectionchatty:
809 if test_support.verbose:
810 sys.stdout.write(" client: closing connection.\n")
811 s.close()
812 finally:
813 server.stop()
814 server.join()
815
816 def try_protocol_combo(server_protocol,
817 client_protocol,
818 expect_success,
819 certsreqs=None):
820 if certsreqs is None:
821 certsreqs = ssl.CERT_NONE
822 certtype = {
823 ssl.CERT_NONE: "CERT_NONE",
824 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
825 ssl.CERT_REQUIRED: "CERT_REQUIRED",
826 }[certsreqs]
827 if test_support.verbose:
828 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
829 sys.stdout.write(formatstr %
830 (ssl.get_protocol_name(client_protocol),
831 ssl.get_protocol_name(server_protocol),
832 certtype))
833 try:
834 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
835 # will send an SSLv3 hello (rather than SSLv2) starting from
836 # OpenSSL 1.0.0 (see issue #8322).
837 server_params_test(CERTFILE, server_protocol, certsreqs,
838 CERTFILE, CERTFILE, client_protocol,
839 ciphers="ALL", chatty=False)
840 # Protocol mismatch can result in either an SSLError, or a
841 # "Connection reset by peer" error.
842 except ssl.SSLError:
843 if expect_success:
844 raise
845 except socket.error as e:
846 if expect_success or e.errno != errno.ECONNRESET:
847 raise
848 else:
849 if not expect_success:
850 raise AssertionError(
851 "Client protocol %s succeeded with server protocol %s!"
852 % (ssl.get_protocol_name(client_protocol),
853 ssl.get_protocol_name(server_protocol)))
854
855
856 class ThreadedTests(unittest.TestCase):
857
858 def test_rude_shutdown(self):
859 """A brutal shutdown of an SSL server should raise an IOError
860 in the client when attempting handshake.
861 """
862 listener_ready = threading.Event()
863 listener_gone = threading.Event()
864
865 s = socket.socket()
866 port = test_support.bind_port(s, HOST)
867
868 # `listener` runs in a thread. It sits in an accept() until
869 # the main thread connects. Then it rudely closes the socket,
870 # and sets Event `listener_gone` to let the main thread know
871 # the socket is gone.
872 def listener():
873 s.listen(5)
874 listener_ready.set()
875 s.accept()
876 s.close()
877 listener_gone.set()
878
879 def connector():
880 listener_ready.wait()
881 c = socket.socket()
882 c.connect((HOST, port))
883 listener_gone.wait()
884 try:
885 ssl_sock = ssl.wrap_socket(c)
886 except IOError:
887 pass
888 else:
889 self.fail('connecting to closed SSL socket should have failed')
890
891 t = threading.Thread(target=listener)
892 t.start()
893 try:
894 connector()
895 finally:
896 t.join()
897
898 @skip_if_broken_ubuntu_ssl
899 def test_echo(self):
900 """Basic test of an SSL client connecting to a server"""
901 if test_support.verbose:
902 sys.stdout.write("\n")
903 server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
904 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
905 chatty=True, connectionchatty=True)
906
907 def test_getpeercert(self):
908 if test_support.verbose:
909 sys.stdout.write("\n")
910 s2 = socket.socket()
911 server = ThreadedEchoServer(CERTFILE,
912 certreqs=ssl.CERT_NONE,
913 ssl_version=ssl.PROTOCOL_SSLv23,
914 cacerts=CERTFILE,
915 chatty=False)
916 flag = threading.Event()
917 server.start(flag)
918 # wait for it to start
919 flag.wait()
920 # try to connect
921 try:
922 s = ssl.wrap_socket(socket.socket(),
923 certfile=CERTFILE,
924 ca_certs=CERTFILE,
925 cert_reqs=ssl.CERT_REQUIRED,
926 ssl_version=ssl.PROTOCOL_SSLv23)
927 s.connect((HOST, server.port))
928 cert = s.getpeercert()
929 self.assertTrue(cert, "Can't get peer certificate.")
930 cipher = s.cipher()
931 if test_support.verbose:
932 sys.stdout.write(pprint.pformat(cert) + '\n')
933 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
934 if 'subject' not in cert:
935 self.fail("No subject field in certificate: %s." %
936 pprint.pformat(cert))
937 if ((('organizationName', 'Python Software Foundation'),)
938 not in cert['subject']):
939 self.fail(
940 "Missing or invalid 'organizationName' field in certificate subject; "
941 "should be 'Python Software Foundation'.")
942 s.close()
943 finally:
944 server.stop()
945 server.join()
946
947 def test_empty_cert(self):
948 """Connecting with an empty cert file"""
949 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
950 "nullcert.pem"))
951 def test_malformed_cert(self):
952 """Connecting with a badly formatted certificate (syntax error)"""
953 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
954 "badcert.pem"))
955 def test_nonexisting_cert(self):
956 """Connecting with a non-existing cert file"""
957 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
958 "wrongcert.pem"))
959 def test_malformed_key(self):
960 """Connecting with a badly formatted key (syntax error)"""
961 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
962 "badkey.pem"))
963
964 @skip_if_broken_ubuntu_ssl
965 def test_protocol_sslv2(self):
966 """Connecting to an SSLv2 server with various client options"""
967 if test_support.verbose:
968 sys.stdout.write("\n")
969 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
970 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
971 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
972 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
973 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
974 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
975
976 @skip_if_broken_ubuntu_ssl
977 def test_protocol_sslv23(self):
978 """Connecting to an SSLv23 server with various client options"""
979 if test_support.verbose:
980 sys.stdout.write("\n")
981 try:
982 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
983 except (ssl.SSLError, socket.error), x:
984 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
985 if test_support.verbose:
986 sys.stdout.write(
987 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
988 % str(x))
989 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
990 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
991 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
992
993 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
994 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
995 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
996
997 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
998 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
999 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1000
1001 @skip_if_broken_ubuntu_ssl
1002 def test_protocol_sslv3(self):
1003 """Connecting to an SSLv3 server with various client options"""
1004 if test_support.verbose:
1005 sys.stdout.write("\n")
1006 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
1007 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1008 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1009 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1010 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1011 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
1012 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1013
1014 @skip_if_broken_ubuntu_ssl
1015 def test_protocol_tlsv1(self):
1016 """Connecting to a TLSv1 server with various client options"""
1017 if test_support.verbose:
1018 sys.stdout.write("\n")
1019 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1020 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1021 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1022 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1023 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1024 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1025 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
1026
1027 def test_starttls(self):
1028 """Switching from clear text to encrypted and back again."""
1029 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
1030
1031 server = ThreadedEchoServer(CERTFILE,
1032 ssl_version=ssl.PROTOCOL_TLSv1,
1033 starttls_server=True,
1034 chatty=True,
1035 connectionchatty=True)
1036 flag = threading.Event()
1037 server.start(flag)
1038 # wait for it to start
1039 flag.wait()
1040 # try to connect
1041 wrapped = False
1042 try:
1043 s = socket.socket()
1044 s.setblocking(1)
1045 s.connect((HOST, server.port))
1046 if test_support.verbose:
1047 sys.stdout.write("\n")
1048 for indata in msgs:
1049 if test_support.verbose:
1050 sys.stdout.write(
1051 " client: sending %s...\n" % repr(indata))
1052 if wrapped:
1053 conn.write(indata)
1054 outdata = conn.read()
1055 else:
1056 s.send(indata)
1057 outdata = s.recv(1024)
1058 if (indata == "STARTTLS" and
1059 outdata.strip().lower().startswith("ok")):
1060 # STARTTLS ok, switch to secure mode
1061 if test_support.verbose:
1062 sys.stdout.write(
1063 " client: read %s from server, starting TLS...\n"
1064 % repr(outdata))
1065 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
1066 wrapped = True
1067 elif (indata == "ENDTLS" and
1068 outdata.strip().lower().startswith("ok")):
1069 # ENDTLS ok, switch back to clear text
1070 if test_support.verbose:
1071 sys.stdout.write(
1072 " client: read %s from server, ending TLS...\n"
1073 % repr(outdata))
1074 s = conn.unwrap()
1075 wrapped = False
1076 else:
1077 if test_support.verbose:
1078 sys.stdout.write(
1079 " client: read %s from server\n" % repr(outdata))
1080 if test_support.verbose:
1081 sys.stdout.write(" client: closing connection.\n")
1082 if wrapped:
1083 conn.write("over\n")
1084 else:
1085 s.send("over\n")
1086 s.close()
1087 finally:
1088 server.stop()
1089 server.join()
1090
1091 def test_socketserver(self):
1092 """Using a SocketServer to create and manage SSL connections."""
1093 server = SocketServerHTTPSServer(CERTFILE)
1094 flag = threading.Event()
1095 server.start(flag)
1096 # wait for it to start
1097 flag.wait()
1098 # try to connect
1099 try:
1100 if test_support.verbose:
1101 sys.stdout.write('\n')
1102 with open(CERTFILE, 'rb') as f:
1103 d1 = f.read()
1104 d2 = ''
1105 # now fetch the same data from the HTTPS server
1106 url = 'https://127.0.0.1:%d/%s' % (
1107 server.port, os.path.split(CERTFILE)[1])
1108 with test_support.check_py3k_warnings():
1109 f = urllib.urlopen(url)
1110 dlen = f.info().getheader("content-length")
1111 if dlen and (int(dlen) > 0):
1112 d2 = f.read(int(dlen))
1113 if test_support.verbose:
1114 sys.stdout.write(
1115 " client: read %d bytes from remote server '%s'\n"
1116 % (len(d2), server))
1117 f.close()
1118 self.assertEqual(d1, d2)
1119 finally:
1120 server.stop()
1121 server.join()
1122
1123 def test_wrapped_accept(self):
1124 """Check the accept() method on SSL sockets."""
1125 if test_support.verbose:
1126 sys.stdout.write("\n")
1127 server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1128 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1129 chatty=True, connectionchatty=True,
1130 wrap_accepting_socket=True)
1131
1132 def test_asyncore_server(self):
1133 """Check the example asyncore integration."""
1134 indata = "TEST MESSAGE of mixed case\n"
1135
1136 if test_support.verbose:
1137 sys.stdout.write("\n")
1138 server = AsyncoreEchoServer(CERTFILE)
1139 flag = threading.Event()
1140 server.start(flag)
1141 # wait for it to start
1142 flag.wait()
1143 # try to connect
1144 try:
1145 s = ssl.wrap_socket(socket.socket())
1146 s.connect(('127.0.0.1', server.port))
1147 if test_support.verbose:
1148 sys.stdout.write(
1149 " client: sending %s...\n" % (repr(indata)))
1150 s.write(indata)
1151 outdata = s.read()
1152 if test_support.verbose:
1153 sys.stdout.write(" client: read %s\n" % repr(outdata))
1154 if outdata != indata.lower():
1155 self.fail(
1156 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1157 % (outdata[:min(len(outdata),20)], len(outdata),
1158 indata[:min(len(indata),20)].lower(), len(indata)))
1159 s.write("over\n")
1160 if test_support.verbose:
1161 sys.stdout.write(" client: closing connection.\n")
1162 s.close()
1163 finally:
1164 server.stop()
1165 # wait for server thread to end
1166 server.join()
1167
1168 def test_recv_send(self):
1169 """Test recv(), send() and friends."""
1170 if test_support.verbose:
1171 sys.stdout.write("\n")
1172
1173 server = ThreadedEchoServer(CERTFILE,
1174 certreqs=ssl.CERT_NONE,
1175 ssl_version=ssl.PROTOCOL_TLSv1,
1176 cacerts=CERTFILE,
1177 chatty=True,
1178 connectionchatty=False)
1179 flag = threading.Event()
1180 server.start(flag)
1181 # wait for it to start
1182 flag.wait()
1183 # try to connect
1184 s = ssl.wrap_socket(socket.socket(),
1185 server_side=False,
1186 certfile=CERTFILE,
1187 ca_certs=CERTFILE,
1188 cert_reqs=ssl.CERT_NONE,
1189 ssl_version=ssl.PROTOCOL_TLSv1)
1190 s.connect((HOST, server.port))
1191 try:
1192 # helper methods for standardising recv* method signatures
1193 def _recv_into():
1194 b = bytearray("\0"*100)
1195 count = s.recv_into(b)
1196 return b[:count]
1197
1198 def _recvfrom_into():
1199 b = bytearray("\0"*100)
1200 count, addr = s.recvfrom_into(b)
1201 return b[:count]
1202
1203 # (name, method, whether to expect success, *args)
1204 send_methods = [
1205 ('send', s.send, True, []),
1206 ('sendto', s.sendto, False, ["some.address"]),
1207 ('sendall', s.sendall, True, []),
1208 ]
1209 recv_methods = [
1210 ('recv', s.recv, True, []),
1211 ('recvfrom', s.recvfrom, False, ["some.address"]),
1212 ('recv_into', _recv_into, True, []),
1213 ('recvfrom_into', _recvfrom_into, False, []),
1214 ]
1215 data_prefix = u"PREFIX_"
1216
1217 for meth_name, send_meth, expect_success, args in send_methods:
1218 indata = data_prefix + meth_name
1219 try:
1220 send_meth(indata.encode('ASCII', 'strict'), *args)
1221 outdata = s.read()
1222 outdata = outdata.decode('ASCII', 'strict')
1223 if outdata != indata.lower():
1224 self.fail(
1225 "While sending with <<%s>> bad data "
1226 "<<%r>> (%d) received; "
1227 "expected <<%r>> (%d)\n" % (
1228 meth_name, outdata[:20], len(outdata),
1229 indata[:20], len(indata)
1230 )
1231 )
1232 except ValueError as e:
1233 if expect_success:
1234 self.fail(
1235 "Failed to send with method <<%s>>; "
1236 "expected to succeed.\n" % (meth_name,)
1237 )
1238 if not str(e).startswith(meth_name):
1239 self.fail(
1240 "Method <<%s>> failed with unexpected "
1241 "exception message: %s\n" % (
1242 meth_name, e
1243 )
1244 )
1245
1246 for meth_name, recv_meth, expect_success, args in recv_methods:
1247 indata = data_prefix + meth_name
1248 try:
1249 s.send(indata.encode('ASCII', 'strict'))
1250 outdata = recv_meth(*args)
1251 outdata = outdata.decode('ASCII', 'strict')
1252 if outdata != indata.lower():
1253 self.fail(
1254 "While receiving with <<%s>> bad data "
1255 "<<%r>> (%d) received; "
1256 "expected <<%r>> (%d)\n" % (
1257 meth_name, outdata[:20], len(outdata),
1258 indata[:20], len(indata)
1259 )
1260 )
1261 except ValueError as e:
1262 if expect_success:
1263 self.fail(
1264 "Failed to receive with method <<%s>>; "
1265 "expected to succeed.\n" % (meth_name,)
1266 )
1267 if not str(e).startswith(meth_name):
1268 self.fail(
1269 "Method <<%s>> failed with unexpected "
1270 "exception message: %s\n" % (
1271 meth_name, e
1272 )
1273 )
1274 # consume data
1275 s.read()
1276
1277 s.write("over\n".encode("ASCII", "strict"))
1278 s.close()
1279 finally:
1280 server.stop()
1281 server.join()
1282
1283 def test_handshake_timeout(self):
1284 # Issue #5103: SSL handshake must respect the socket timeout
1285 server = socket.socket(socket.AF_INET)
1286 host = "127.0.0.1"
1287 port = test_support.bind_port(server)
1288 started = threading.Event()
1289 finish = False
1290
1291 def serve():
1292 server.listen(5)
1293 started.set()
1294 conns = []
1295 while not finish:
1296 r, w, e = select.select([server], [], [], 0.1)
1297 if server in r:
1298 # Let the socket hang around rather than having
1299 # it closed by garbage collection.
1300 conns.append(server.accept()[0])
1301
1302 t = threading.Thread(target=serve)
1303 t.start()
1304 started.wait()
1305
1306 try:
1307 try:
1308 c = socket.socket(socket.AF_INET)
1309 c.settimeout(0.2)
1310 c.connect((host, port))
1311 # Will attempt handshake and time out
1312 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1313 ssl.wrap_socket, c)
1314 finally:
1315 c.close()
1316 try:
1317 c = socket.socket(socket.AF_INET)
1318 c.settimeout(0.2)
1319 c = ssl.wrap_socket(c)
1320 # Will attempt handshake and time out
1321 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1322 c.connect, (host, port))
1323 finally:
1324 c.close()
1325 finally:
1326 finish = True
1327 t.join()
1328 server.close()
1329
1330
1331 def test_main(verbose=False):
1332 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1333 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1334 "keycert.pem")
1335 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1336 os.path.dirname(__file__) or os.curdir,
1337 "https_svn_python_org_root.pem")
1338
1339 if (not os.path.exists(CERTFILE) or
1340 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1341 raise test_support.TestFailed("Can't read certificate files!")
1342
1343 tests = [BasicTests, BasicSocketTests]
1344
1345 if test_support.is_resource_enabled('network'):
1346 tests.append(NetworkedTests)
1347
1348 if _have_threads:
1349 thread_info = test_support.threading_setup()
1350 if thread_info and test_support.is_resource_enabled('network'):
1351 tests.append(ThreadedTests)
1352
1353 try:
1354 test_support.run_unittest(*tests)
1355 finally:
1356 if _have_threads:
1357 test_support.threading_cleanup(*thread_info)
1358
1359 if __name__ == "__main__":
1360 test_main()