]>
git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py
1 # Test the support for SSL and sockets
5 from test
import test_support
14 import urllib
, urlparse
20 from BaseHTTPServer
import HTTPServer
21 from SimpleHTTPServer
import SimpleHTTPRequestHandler
23 ssl
= test_support
.import_module("ssl")
25 HOST
= test_support
.HOST
27 SVN_PYTHON_ORG_ROOT_CERT
= None
29 def handle_error(prefix
):
30 exc_format
= ' '.join(traceback
.format_exception(*sys
.exc_info()))
31 if test_support
.verbose
:
32 sys
.stdout
.write(prefix
+ exc_format
)
35 class BasicTests(unittest
.TestCase
):
37 def test_sslwrap_simple(self
):
38 # A crude test for the legacy API
40 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
))
42 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
47 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
)._sock
)
49 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
54 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
55 def skip_if_broken_ubuntu_ssl(func
):
56 if hasattr(ssl
, 'PROTOCOL_SSLv2'):
57 # We need to access the lower-level wrapper in order to create an
58 # implicit SSL context without trying to connect or listen.
62 # The returned function won't get executed, just ignore the error
64 @functools.wraps(func
)
65 def f(*args
, **kwargs
):
67 s
= socket
.socket(socket
.AF_INET
)
68 _ssl
.sslwrap(s
._sock
, 0, None, None,
69 ssl
.CERT_NONE
, ssl
.PROTOCOL_SSLv2
, None, None)
70 except ssl
.SSLError
as e
:
71 if (ssl
.OPENSSL_VERSION_INFO
== (0, 9, 8, 15, 15) and
72 platform
.linux_distribution() == ('debian', 'squeeze/sid', '')
73 and 'Invalid SSL protocol variant specified' in str(e
)):
74 raise unittest
.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
75 return func(*args
, **kwargs
)
81 class BasicSocketTests(unittest
.TestCase
):
83 def test_constants(self
):
92 def test_random(self
):
94 if test_support
.verbose
:
95 sys
.stdout
.write("\n RAND_status is %d (%s)\n"
96 % (v
, (v
and "sufficient randomness") or
97 "insufficient randomness"))
103 print "didn't raise TypeError"
104 ssl
.RAND_add("this is a random string", 75.0)
106 def test_parse_cert(self
):
107 # note that this uses an 'unofficial' function in _ssl.c,
108 # provided solely for this test, to exercise the certificate
110 p
= ssl
._ssl
._test
_decode
_cert
(CERTFILE
, False)
111 if test_support
.verbose
:
112 sys
.stdout
.write("\n" + pprint
.pformat(p
) + "\n")
114 def test_DER_to_PEM(self
):
115 with
open(SVN_PYTHON_ORG_ROOT_CERT
, 'r') as f
:
117 d1
= ssl
.PEM_cert_to_DER_cert(pem
)
118 p2
= ssl
.DER_cert_to_PEM_cert(d1
)
119 d2
= ssl
.PEM_cert_to_DER_cert(p2
)
120 self
.assertEqual(d1
, d2
)
121 if not p2
.startswith(ssl
.PEM_HEADER
+ '\n'):
122 self
.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2
)
123 if not p2
.endswith('\n' + ssl
.PEM_FOOTER
+ '\n'):
124 self
.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2
)
126 def test_openssl_version(self
):
127 n
= ssl
.OPENSSL_VERSION_NUMBER
128 t
= ssl
.OPENSSL_VERSION_INFO
129 s
= ssl
.OPENSSL_VERSION
130 self
.assertIsInstance(n
, (int, long))
131 self
.assertIsInstance(t
, tuple)
132 self
.assertIsInstance(s
, str)
133 # Some sanity checks follow
135 self
.assertGreaterEqual(n
, 0x900000)
137 self
.assertLess(n
, 0x20000000)
138 major
, minor
, fix
, patch
, status
= t
139 self
.assertGreaterEqual(major
, 0)
140 self
.assertLess(major
, 2)
141 self
.assertGreaterEqual(minor
, 0)
142 self
.assertLess(minor
, 256)
143 self
.assertGreaterEqual(fix
, 0)
144 self
.assertLess(fix
, 256)
145 self
.assertGreaterEqual(patch
, 0)
146 self
.assertLessEqual(patch
, 26)
147 self
.assertGreaterEqual(status
, 0)
148 self
.assertLessEqual(status
, 15)
149 # Version string as returned by OpenSSL, the format might change
150 self
.assertTrue(s
.startswith("OpenSSL {:d}.{:d}.{:d}".format(major
, minor
, fix
)),
153 def test_ciphers(self
):
154 if not test_support
.is_resource_enabled('network'):
156 remote
= ("svn.python.org", 443)
157 with test_support
.transient_internet(remote
[0]):
158 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
159 cert_reqs
=ssl
.CERT_NONE
, ciphers
="ALL")
161 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
162 cert_reqs
=ssl
.CERT_NONE
, ciphers
="DEFAULT")
164 # Error checking occurs when connecting, because the SSL context
165 # isn't created before.
166 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
167 cert_reqs
=ssl
.CERT_NONE
, ciphers
="^$:,;?*'dorothyx")
168 with self
.assertRaisesRegexp(ssl
.SSLError
, "No cipher can be selected"):
171 @test_support.cpython_only
172 def test_refcycle(self
):
173 # Issue #7943: an SSL object doesn't create reference cycles with
175 s
= socket
.socket(socket
.AF_INET
)
176 ss
= ssl
.wrap_socket(s
)
179 self
.assertEqual(wr(), None)
181 def test_wrapped_unconnected(self
):
182 # The _delegate_methods in socket.py are correctly delegated to by an
183 # unconnected SSLSocket, so they will raise a socket.error rather than
184 # something unexpected like TypeError.
185 s
= socket
.socket(socket
.AF_INET
)
186 ss
= ssl
.wrap_socket(s
)
187 self
.assertRaises(socket
.error
, ss
.recv
, 1)
188 self
.assertRaises(socket
.error
, ss
.recv_into
, bytearray(b
'x'))
189 self
.assertRaises(socket
.error
, ss
.recvfrom
, 1)
190 self
.assertRaises(socket
.error
, ss
.recvfrom_into
, bytearray(b
'x'), 1)
191 self
.assertRaises(socket
.error
, ss
.send
, b
'x')
192 self
.assertRaises(socket
.error
, ss
.sendto
, b
'x', ('0.0.0.0', 0))
195 class NetworkedTests(unittest
.TestCase
):
197 def test_connect(self
):
198 with test_support
.transient_internet("svn.python.org"):
199 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
200 cert_reqs
=ssl
.CERT_NONE
)
201 s
.connect(("svn.python.org", 443))
204 self
.fail("Peer cert %s shouldn't be here!")
207 # this should fail because we have no verification certs
208 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
209 cert_reqs
=ssl
.CERT_REQUIRED
)
211 s
.connect(("svn.python.org", 443))
217 # this should succeed because we specify the root cert
218 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
219 cert_reqs
=ssl
.CERT_REQUIRED
,
220 ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
222 s
.connect(("svn.python.org", 443))
226 def test_connect_ex(self
):
227 # Issue #11326: check connect_ex() implementation
228 with test_support
.transient_internet("svn.python.org"):
229 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
230 cert_reqs
=ssl
.CERT_REQUIRED
,
231 ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
233 self
.assertEqual(0, s
.connect_ex(("svn.python.org", 443)))
234 self
.assertTrue(s
.getpeercert())
238 def test_non_blocking_connect_ex(self
):
239 # Issue #11326: non-blocking connect_ex() should allow handshake
240 # to proceed after the socket gets ready.
241 with test_support
.transient_internet("svn.python.org"):
242 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
243 cert_reqs
=ssl
.CERT_REQUIRED
,
244 ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
,
245 do_handshake_on_connect
=False)
248 rc
= s
.connect_ex(('svn.python.org', 443))
249 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
250 self
.assertIn(rc
, (0, errno
.EINPROGRESS
, errno
.EWOULDBLOCK
))
251 # Wait for connect to finish
252 select
.select([], [s
], [], 5.0)
253 # Non-blocking handshake
258 except ssl
.SSLError
as err
:
259 if err
.args
[0] == ssl
.SSL_ERROR_WANT_READ
:
260 select
.select([s
], [], [], 5.0)
261 elif err
.args
[0] == ssl
.SSL_ERROR_WANT_WRITE
:
262 select
.select([], [s
], [], 5.0)
266 self
.assertTrue(s
.getpeercert())
270 @unittest.skipIf(os
.name
== "nt", "Can't use a socket as a file under Windows")
271 def test_makefile_close(self
):
272 # Issue #5238: creating a file-like object with makefile() shouldn't
273 # delay closing the underlying "real socket" (here tested with its
274 # file descriptor, hence skipping the test under Windows).
275 with test_support
.transient_internet("svn.python.org"):
276 ss
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
))
277 ss
.connect(("svn.python.org", 443))
281 # The fd is still open
283 # Closing the SSL socket should close the fd too
286 with self
.assertRaises(OSError) as e
:
288 self
.assertEqual(e
.exception
.errno
, errno
.EBADF
)
290 def test_non_blocking_handshake(self
):
291 with test_support
.transient_internet("svn.python.org"):
292 s
= socket
.socket(socket
.AF_INET
)
293 s
.connect(("svn.python.org", 443))
295 s
= ssl
.wrap_socket(s
,
296 cert_reqs
=ssl
.CERT_NONE
,
297 do_handshake_on_connect
=False)
304 except ssl
.SSLError
, err
:
305 if err
.args
[0] == ssl
.SSL_ERROR_WANT_READ
:
306 select
.select([s
], [], [])
307 elif err
.args
[0] == ssl
.SSL_ERROR_WANT_WRITE
:
308 select
.select([], [s
], [])
312 if test_support
.verbose
:
313 sys
.stdout
.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count
)
315 def test_get_server_certificate(self
):
316 with test_support
.transient_internet("svn.python.org"):
317 pem
= ssl
.get_server_certificate(("svn.python.org", 443))
319 self
.fail("No server certificate on svn.python.org:443!")
322 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=CERTFILE
)
327 self
.fail("Got server certificate %s for svn.python.org!" % pem
)
329 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
331 self
.fail("No server certificate on svn.python.org:443!")
332 if test_support
.verbose
:
333 sys
.stdout
.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem
)
335 def test_algorithms(self
):
336 # Issue #8484: all algorithms should be available when verifying a
338 # SHA256 was added in OpenSSL 0.9.8
339 if ssl
.OPENSSL_VERSION_INFO
< (0, 9, 8, 0, 15):
340 self
.skipTest("SHA256 not available on %r" % ssl
.OPENSSL_VERSION
)
341 # NOTE: https://sha256.tbs-internet.com is another possible test host
342 remote
= ("sha256.tbs-internet.com", 443)
343 sha256_cert
= os
.path
.join(os
.path
.dirname(__file__
), "sha256.pem")
344 with test_support
.transient_internet("sha256.tbs-internet.com"):
345 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
346 cert_reqs
=ssl
.CERT_REQUIRED
,
347 ca_certs
=sha256_cert
,)
350 if test_support
.verbose
:
351 sys
.stdout
.write("\nCipher with %r is %r\n" %
352 (remote
, s
.cipher()))
353 sys
.stdout
.write("Certificate is:\n%s\n" %
354 pprint
.pformat(s
.getpeercert()))
362 _have_threads
= False
366 class ThreadedEchoServer(threading
.Thread
):
368 class ConnectionHandler(threading
.Thread
):
370 """A mildly complicated class, because we want it to work both
371 with and without the SSL wrapper around the socket connection, so
372 that we can test the STARTTLS functionality."""
374 def __init__(self
, server
, connsock
):
378 self
.sock
.setblocking(1)
380 threading
.Thread
.__init
__(self
)
383 def show_conn_details(self
):
384 if self
.server
.certreqs
== ssl
.CERT_REQUIRED
:
385 cert
= self
.sslconn
.getpeercert()
386 if test_support
.verbose
and self
.server
.chatty
:
387 sys
.stdout
.write(" client cert is " + pprint
.pformat(cert
) + "\n")
388 cert_binary
= self
.sslconn
.getpeercert(True)
389 if test_support
.verbose
and self
.server
.chatty
:
390 sys
.stdout
.write(" cert binary is " + str(len(cert_binary
)) + " bytes\n")
391 cipher
= self
.sslconn
.cipher()
392 if test_support
.verbose
and self
.server
.chatty
:
393 sys
.stdout
.write(" server: connection cipher is now " + str(cipher
) + "\n")
397 self
.sslconn
= ssl
.wrap_socket(self
.sock
, server_side
=True,
398 certfile
=self
.server
.certificate
,
399 ssl_version
=self
.server
.protocol
,
400 ca_certs
=self
.server
.cacerts
,
401 cert_reqs
=self
.server
.certreqs
,
402 ciphers
=self
.server
.ciphers
)
404 # XXX Various errors can have happened here, for example
405 # a mismatching protocol version, an invalid certificate,
406 # or a low-level bug. This should be made more discriminating.
407 if self
.server
.chatty
:
408 handle_error("\n server: bad connection attempt from " +
409 str(self
.sock
.getpeername()) + ":\n")
419 return self
.sslconn
.read()
421 return self
.sock
.recv(1024)
423 def write(self
, bytes
):
425 return self
.sslconn
.write(bytes
)
427 return self
.sock
.send(bytes
)
433 self
.sock
._sock
.close()
437 if not self
.server
.starttls_server
:
438 if isinstance(self
.sock
, ssl
.SSLSocket
):
439 self
.sslconn
= self
.sock
440 elif not self
.wrap_conn():
442 self
.show_conn_details()
447 # eof, so quit this handler
450 elif msg
.strip() == 'over':
451 if test_support
.verbose
and self
.server
.connectionchatty
:
452 sys
.stdout
.write(" server: client closed connection\n")
455 elif self
.server
.starttls_server
and msg
.strip() == 'STARTTLS':
456 if test_support
.verbose
and self
.server
.connectionchatty
:
457 sys
.stdout
.write(" server: read STARTTLS from client, sending OK...\n")
459 if not self
.wrap_conn():
461 elif self
.server
.starttls_server
and self
.sslconn
and msg
.strip() == 'ENDTLS':
462 if test_support
.verbose
and self
.server
.connectionchatty
:
463 sys
.stdout
.write(" server: read ENDTLS from client, sending OK...\n")
465 self
.sslconn
.unwrap()
467 if test_support
.verbose
and self
.server
.connectionchatty
:
468 sys
.stdout
.write(" server: connection is now unencrypted...\n")
470 if (test_support
.verbose
and
471 self
.server
.connectionchatty
):
472 ctype
= (self
.sslconn
and "encrypted") or "unencrypted"
473 sys
.stdout
.write(" server: read %s (%s), sending back %s (%s)...\n"
474 % (repr(msg
), ctype
, repr(msg
.lower()), ctype
))
475 self
.write(msg
.lower())
477 if self
.server
.chatty
:
478 handle_error("Test server failure:\n")
481 # normally, we'd just stop here, but for the test
482 # harness, we want to stop the server
485 def __init__(self
, certificate
, ssl_version
=None,
486 certreqs
=None, cacerts
=None,
487 chatty
=True, connectionchatty
=False, starttls_server
=False,
488 wrap_accepting_socket
=False, ciphers
=None):
490 if ssl_version
is None:
491 ssl_version
= ssl
.PROTOCOL_TLSv1
493 certreqs
= ssl
.CERT_NONE
494 self
.certificate
= certificate
495 self
.protocol
= ssl_version
496 self
.certreqs
= certreqs
497 self
.cacerts
= cacerts
498 self
.ciphers
= ciphers
500 self
.connectionchatty
= connectionchatty
501 self
.starttls_server
= starttls_server
502 self
.sock
= socket
.socket()
504 if wrap_accepting_socket
:
505 self
.sock
= ssl
.wrap_socket(self
.sock
, server_side
=True,
506 certfile
=self
.certificate
,
507 cert_reqs
= self
.certreqs
,
508 ca_certs
= self
.cacerts
,
509 ssl_version
= self
.protocol
,
510 ciphers
= self
.ciphers
)
511 if test_support
.verbose
and self
.chatty
:
512 sys
.stdout
.write(' server: wrapped server socket as %s\n' % str(self
.sock
))
513 self
.port
= test_support
.bind_port(self
.sock
)
515 threading
.Thread
.__init
__(self
)
518 def start(self
, flag
=None):
520 threading
.Thread
.start(self
)
523 self
.sock
.settimeout(0.05)
531 newconn
, connaddr
= self
.sock
.accept()
532 if test_support
.verbose
and self
.chatty
:
533 sys
.stdout
.write(' server: new connection from '
534 + str(connaddr
) + '\n')
535 handler
= self
.ConnectionHandler(self
, newconn
)
537 except socket
.timeout
:
539 except KeyboardInterrupt:
546 class AsyncoreEchoServer(threading
.Thread
):
548 class EchoServer(asyncore
.dispatcher
):
550 class ConnectionHandler(asyncore
.dispatcher_with_send
):
552 def __init__(self
, conn
, certfile
):
553 asyncore
.dispatcher_with_send
.__init
__(self
, conn
)
554 self
.socket
= ssl
.wrap_socket(conn
, server_side
=True,
556 do_handshake_on_connect
=False)
557 self
._ssl
_accepting
= True
560 if isinstance(self
.socket
, ssl
.SSLSocket
):
561 while self
.socket
.pending() > 0:
562 self
.handle_read_event()
565 def _do_ssl_handshake(self
):
567 self
.socket
.do_handshake()
568 except ssl
.SSLError
, err
:
569 if err
.args
[0] in (ssl
.SSL_ERROR_WANT_READ
,
570 ssl
.SSL_ERROR_WANT_WRITE
):
572 elif err
.args
[0] == ssl
.SSL_ERROR_EOF
:
573 return self
.handle_close()
575 except socket
.error
, err
:
576 if err
.args
[0] == errno
.ECONNABORTED
:
577 return self
.handle_close()
579 self
._ssl
_accepting
= False
581 def handle_read(self
):
582 if self
._ssl
_accepting
:
583 self
._do
_ssl
_handshake
()
585 data
= self
.recv(1024)
586 if data
and data
.strip() != 'over':
587 self
.send(data
.lower())
589 def handle_close(self
):
591 if test_support
.verbose
:
592 sys
.stdout
.write(" server: closed connection %s\n" % self
.socket
)
594 def handle_error(self
):
597 def __init__(self
, certfile
):
598 self
.certfile
= certfile
599 asyncore
.dispatcher
.__init
__(self
)
600 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
601 self
.port
= test_support
.bind_port(self
.socket
)
604 def handle_accept(self
):
605 sock_obj
, addr
= self
.accept()
606 if test_support
.verbose
:
607 sys
.stdout
.write(" server: new connection from %s:%s\n" %addr
)
608 self
.ConnectionHandler(sock_obj
, self
.certfile
)
610 def handle_error(self
):
613 def __init__(self
, certfile
):
616 self
.server
= self
.EchoServer(certfile
)
617 self
.port
= self
.server
.port
618 threading
.Thread
.__init
__(self
)
622 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
624 def start(self
, flag
=None):
626 threading
.Thread
.start(self
)
639 class SocketServerHTTPSServer(threading
.Thread
):
641 class HTTPSServer(HTTPServer
):
643 def __init__(self
, server_address
, RequestHandlerClass
, certfile
):
644 HTTPServer
.__init
__(self
, server_address
, RequestHandlerClass
)
645 # we assume the certfile contains both private key and certificate
646 self
.certfile
= certfile
647 self
.allow_reuse_address
= True
650 return ('<%s %s:%s>' %
651 (self
.__class
__.__name
__,
655 def get_request(self
):
656 # override this to wrap socket with SSL
657 sock
, addr
= self
.socket
.accept()
658 sslconn
= ssl
.wrap_socket(sock
, server_side
=True,
659 certfile
=self
.certfile
)
662 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler
):
663 # need to override translate_path to get a known root,
664 # instead of using os.curdir, since the test could be
667 server_version
= "TestHTTPS/1.0"
671 def translate_path(self
, path
):
672 """Translate a /-separated PATH to the local filename syntax.
674 Components that mean special things to the local file system
675 (e.g. drive or directory names) are ignored. (XXX They should
676 probably be diagnosed.)
679 # abandon query parameters
680 path
= urlparse
.urlparse(path
)[2]
681 path
= os
.path
.normpath(urllib
.unquote(path
))
682 words
= path
.split('/')
683 words
= filter(None, words
)
686 drive
, word
= os
.path
.splitdrive(word
)
687 head
, word
= os
.path
.split(word
)
688 if word
in self
.root
: continue
689 path
= os
.path
.join(path
, word
)
692 def log_message(self
, format
, *args
):
694 # we override this to suppress logging unless "verbose"
696 if test_support
.verbose
:
697 sys
.stdout
.write(" server (%s:%d %s):\n [%s] %s\n" %
698 (self
.server
.server_address
,
699 self
.server
.server_port
,
700 self
.request
.cipher(),
701 self
.log_date_time_string(),
705 def __init__(self
, certfile
):
707 self
.RootedHTTPRequestHandler
.root
= os
.path
.split(CERTFILE
)[0]
708 self
.server
= self
.HTTPSServer(
709 (HOST
, 0), self
.RootedHTTPRequestHandler
, certfile
)
710 self
.port
= self
.server
.server_port
711 threading
.Thread
.__init
__(self
)
715 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
717 def start(self
, flag
=None):
719 threading
.Thread
.start(self
)
724 self
.server
.serve_forever(0.05)
727 self
.server
.shutdown()
730 def bad_cert_test(certfile
):
732 Launch a server with CERT_REQUIRED, and check that trying to
733 connect to it with the given client certificate fails.
735 server
= ThreadedEchoServer(CERTFILE
,
736 certreqs
=ssl
.CERT_REQUIRED
,
737 cacerts
=CERTFILE
, chatty
=False)
738 flag
= threading
.Event()
740 # wait for it to start
745 s
= ssl
.wrap_socket(socket
.socket(),
747 ssl_version
=ssl
.PROTOCOL_TLSv1
)
748 s
.connect((HOST
, server
.port
))
749 except ssl
.SSLError
, x
:
750 if test_support
.verbose
:
751 sys
.stdout
.write("\nSSLError is %s\n" % x
[1])
752 except socket
.error
, x
:
753 if test_support
.verbose
:
754 sys
.stdout
.write("\nsocket.error is %s\n" % x
[1])
756 raise AssertionError("Use of invalid cert should have failed!")
761 def server_params_test(certfile
, protocol
, certreqs
, cacertsfile
,
762 client_certfile
, client_protocol
=None, indata
="FOO\n",
763 ciphers
=None, chatty
=True, connectionchatty
=False,
764 wrap_accepting_socket
=False):
766 Launch a server, connect a client to it and try various reads
769 server
= ThreadedEchoServer(certfile
,
771 ssl_version
=protocol
,
775 connectionchatty
=connectionchatty
,
776 wrap_accepting_socket
=wrap_accepting_socket
)
777 flag
= threading
.Event()
779 # wait for it to start
782 if client_protocol
is None:
783 client_protocol
= protocol
785 s
= ssl
.wrap_socket(socket
.socket(),
786 certfile
=client_certfile
,
787 ca_certs
=cacertsfile
,
790 ssl_version
=client_protocol
)
791 s
.connect((HOST
, server
.port
))
792 for arg
in [indata
, bytearray(indata
), memoryview(indata
)]:
794 if test_support
.verbose
:
796 " client: sending %s...\n" % (repr(arg
)))
800 if test_support
.verbose
:
801 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
802 if outdata
!= indata
.lower():
803 raise AssertionError(
804 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
805 % (outdata
[:min(len(outdata
),20)], len(outdata
),
806 indata
[:min(len(indata
),20)].lower(), len(indata
)))
809 if test_support
.verbose
:
810 sys
.stdout
.write(" client: closing connection.\n")
816 def try_protocol_combo(server_protocol
,
820 if certsreqs
is None:
821 certsreqs
= ssl
.CERT_NONE
823 ssl
.CERT_NONE
: "CERT_NONE",
824 ssl
.CERT_OPTIONAL
: "CERT_OPTIONAL",
825 ssl
.CERT_REQUIRED
: "CERT_REQUIRED",
827 if test_support
.verbose
:
828 formatstr
= (expect_success
and " %s->%s %s\n") or " {%s->%s} %s\n"
829 sys
.stdout
.write(formatstr
%
830 (ssl
.get_protocol_name(client_protocol
),
831 ssl
.get_protocol_name(server_protocol
),
834 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
835 # will send an SSLv3 hello (rather than SSLv2) starting from
836 # OpenSSL 1.0.0 (see issue #8322).
837 server_params_test(CERTFILE
, server_protocol
, certsreqs
,
838 CERTFILE
, CERTFILE
, client_protocol
,
839 ciphers
="ALL", chatty
=False)
840 # Protocol mismatch can result in either an SSLError, or a
841 # "Connection reset by peer" error.
845 except socket
.error
as e
:
846 if expect_success
or e
.errno
!= errno
.ECONNRESET
:
849 if not expect_success
:
850 raise AssertionError(
851 "Client protocol %s succeeded with server protocol %s!"
852 % (ssl
.get_protocol_name(client_protocol
),
853 ssl
.get_protocol_name(server_protocol
)))
856 class ThreadedTests(unittest
.TestCase
):
858 def test_rude_shutdown(self
):
859 """A brutal shutdown of an SSL server should raise an IOError
860 in the client when attempting handshake.
862 listener_ready
= threading
.Event()
863 listener_gone
= threading
.Event()
866 port
= test_support
.bind_port(s
, HOST
)
868 # `listener` runs in a thread. It sits in an accept() until
869 # the main thread connects. Then it rudely closes the socket,
870 # and sets Event `listener_gone` to let the main thread know
871 # the socket is gone.
880 listener_ready
.wait()
882 c
.connect((HOST
, port
))
885 ssl_sock
= ssl
.wrap_socket(c
)
889 self
.fail('connecting to closed SSL socket should have failed')
891 t
= threading
.Thread(target
=listener
)
898 @skip_if_broken_ubuntu_ssl
900 """Basic test of an SSL client connecting to a server"""
901 if test_support
.verbose
:
902 sys
.stdout
.write("\n")
903 server_params_test(CERTFILE
, ssl
.PROTOCOL_TLSv1
, ssl
.CERT_NONE
,
904 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_TLSv1
,
905 chatty
=True, connectionchatty
=True)
907 def test_getpeercert(self
):
908 if test_support
.verbose
:
909 sys
.stdout
.write("\n")
911 server
= ThreadedEchoServer(CERTFILE
,
912 certreqs
=ssl
.CERT_NONE
,
913 ssl_version
=ssl
.PROTOCOL_SSLv23
,
916 flag
= threading
.Event()
918 # wait for it to start
922 s
= ssl
.wrap_socket(socket
.socket(),
925 cert_reqs
=ssl
.CERT_REQUIRED
,
926 ssl_version
=ssl
.PROTOCOL_SSLv23
)
927 s
.connect((HOST
, server
.port
))
928 cert
= s
.getpeercert()
929 self
.assertTrue(cert
, "Can't get peer certificate.")
931 if test_support
.verbose
:
932 sys
.stdout
.write(pprint
.pformat(cert
) + '\n')
933 sys
.stdout
.write("Connection cipher is " + str(cipher
) + '.\n')
934 if 'subject' not in cert
:
935 self
.fail("No subject field in certificate: %s." %
936 pprint
.pformat(cert
))
937 if ((('organizationName', 'Python Software Foundation'),)
938 not in cert
['subject']):
940 "Missing or invalid 'organizationName' field in certificate subject; "
941 "should be 'Python Software Foundation'.")
947 def test_empty_cert(self
):
948 """Connecting with an empty cert file"""
949 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
951 def test_malformed_cert(self
):
952 """Connecting with a badly formatted certificate (syntax error)"""
953 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
955 def test_nonexisting_cert(self
):
956 """Connecting with a non-existing cert file"""
957 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
959 def test_malformed_key(self
):
960 """Connecting with a badly formatted key (syntax error)"""
961 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
964 @skip_if_broken_ubuntu_ssl
965 def test_protocol_sslv2(self
):
966 """Connecting to an SSLv2 server with various client options"""
967 if test_support
.verbose
:
968 sys
.stdout
.write("\n")
969 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True)
970 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_OPTIONAL
)
971 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_REQUIRED
)
972 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv23
, True)
973 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv3
, False)
974 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_TLSv1
, False)
976 @skip_if_broken_ubuntu_ssl
977 def test_protocol_sslv23(self
):
978 """Connecting to an SSLv23 server with various client options"""
979 if test_support
.verbose
:
980 sys
.stdout
.write("\n")
982 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv2
, True)
983 except (ssl
.SSLError
, socket
.error
), x
:
984 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
985 if test_support
.verbose
:
987 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
989 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True)
990 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True)
991 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True)
993 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
994 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_OPTIONAL
)
995 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
997 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
998 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_REQUIRED
)
999 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
1001 @skip_if_broken_ubuntu_ssl
1002 def test_protocol_sslv3(self
):
1003 """Connecting to an SSLv3 server with various client options"""
1004 if test_support
.verbose
:
1005 sys
.stdout
.write("\n")
1006 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True)
1007 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
1008 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
1009 if hasattr(ssl
, 'PROTOCOL_SSLv2'):
1010 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv2
, False)
1011 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv23
, False)
1012 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_TLSv1
, False)
1014 @skip_if_broken_ubuntu_ssl
1015 def test_protocol_tlsv1(self
):
1016 """Connecting to a TLSv1 server with various client options"""
1017 if test_support
.verbose
:
1018 sys
.stdout
.write("\n")
1019 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True)
1020 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
1021 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
1022 if hasattr(ssl
, 'PROTOCOL_SSLv2'):
1023 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv2
, False)
1024 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv3
, False)
1025 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv23
, False)
1027 def test_starttls(self
):
1028 """Switching from clear text to encrypted and back again."""
1029 msgs
= ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
1031 server
= ThreadedEchoServer(CERTFILE
,
1032 ssl_version
=ssl
.PROTOCOL_TLSv1
,
1033 starttls_server
=True,
1035 connectionchatty
=True)
1036 flag
= threading
.Event()
1038 # wait for it to start
1045 s
.connect((HOST
, server
.port
))
1046 if test_support
.verbose
:
1047 sys
.stdout
.write("\n")
1049 if test_support
.verbose
:
1051 " client: sending %s...\n" % repr(indata
))
1054 outdata
= conn
.read()
1057 outdata
= s
.recv(1024)
1058 if (indata
== "STARTTLS" and
1059 outdata
.strip().lower().startswith("ok")):
1060 # STARTTLS ok, switch to secure mode
1061 if test_support
.verbose
:
1063 " client: read %s from server, starting TLS...\n"
1065 conn
= ssl
.wrap_socket(s
, ssl_version
=ssl
.PROTOCOL_TLSv1
)
1067 elif (indata
== "ENDTLS" and
1068 outdata
.strip().lower().startswith("ok")):
1069 # ENDTLS ok, switch back to clear text
1070 if test_support
.verbose
:
1072 " client: read %s from server, ending TLS...\n"
1077 if test_support
.verbose
:
1079 " client: read %s from server\n" % repr(outdata
))
1080 if test_support
.verbose
:
1081 sys
.stdout
.write(" client: closing connection.\n")
1083 conn
.write("over\n")
1091 def test_socketserver(self
):
1092 """Using a SocketServer to create and manage SSL connections."""
1093 server
= SocketServerHTTPSServer(CERTFILE
)
1094 flag
= threading
.Event()
1096 # wait for it to start
1100 if test_support
.verbose
:
1101 sys
.stdout
.write('\n')
1102 with
open(CERTFILE
, 'rb') as f
:
1105 # now fetch the same data from the HTTPS server
1106 url
= 'https://127.0.0.1:%d/%s' % (
1107 server
.port
, os
.path
.split(CERTFILE
)[1])
1108 with test_support
.check_py3k_warnings():
1109 f
= urllib
.urlopen(url
)
1110 dlen
= f
.info().getheader("content-length")
1111 if dlen
and (int(dlen
) > 0):
1112 d2
= f
.read(int(dlen
))
1113 if test_support
.verbose
:
1115 " client: read %d bytes from remote server '%s'\n"
1116 % (len(d2
), server
))
1118 self
.assertEqual(d1
, d2
)
1123 def test_wrapped_accept(self
):
1124 """Check the accept() method on SSL sockets."""
1125 if test_support
.verbose
:
1126 sys
.stdout
.write("\n")
1127 server_params_test(CERTFILE
, ssl
.PROTOCOL_SSLv23
, ssl
.CERT_REQUIRED
,
1128 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_SSLv23
,
1129 chatty
=True, connectionchatty
=True,
1130 wrap_accepting_socket
=True)
1132 def test_asyncore_server(self
):
1133 """Check the example asyncore integration."""
1134 indata
= "TEST MESSAGE of mixed case\n"
1136 if test_support
.verbose
:
1137 sys
.stdout
.write("\n")
1138 server
= AsyncoreEchoServer(CERTFILE
)
1139 flag
= threading
.Event()
1141 # wait for it to start
1145 s
= ssl
.wrap_socket(socket
.socket())
1146 s
.connect(('127.0.0.1', server
.port
))
1147 if test_support
.verbose
:
1149 " client: sending %s...\n" % (repr(indata
)))
1152 if test_support
.verbose
:
1153 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
1154 if outdata
!= indata
.lower():
1156 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1157 % (outdata
[:min(len(outdata
),20)], len(outdata
),
1158 indata
[:min(len(indata
),20)].lower(), len(indata
)))
1160 if test_support
.verbose
:
1161 sys
.stdout
.write(" client: closing connection.\n")
1165 # wait for server thread to end
1168 def test_recv_send(self
):
1169 """Test recv(), send() and friends."""
1170 if test_support
.verbose
:
1171 sys
.stdout
.write("\n")
1173 server
= ThreadedEchoServer(CERTFILE
,
1174 certreqs
=ssl
.CERT_NONE
,
1175 ssl_version
=ssl
.PROTOCOL_TLSv1
,
1178 connectionchatty
=False)
1179 flag
= threading
.Event()
1181 # wait for it to start
1184 s
= ssl
.wrap_socket(socket
.socket(),
1188 cert_reqs
=ssl
.CERT_NONE
,
1189 ssl_version
=ssl
.PROTOCOL_TLSv1
)
1190 s
.connect((HOST
, server
.port
))
1192 # helper methods for standardising recv* method signatures
1194 b
= bytearray("\0"*100)
1195 count
= s
.recv_into(b
)
1198 def _recvfrom_into():
1199 b
= bytearray("\0"*100)
1200 count
, addr
= s
.recvfrom_into(b
)
1203 # (name, method, whether to expect success, *args)
1205 ('send', s
.send
, True, []),
1206 ('sendto', s
.sendto
, False, ["some.address"]),
1207 ('sendall', s
.sendall
, True, []),
1210 ('recv', s
.recv
, True, []),
1211 ('recvfrom', s
.recvfrom
, False, ["some.address"]),
1212 ('recv_into', _recv_into
, True, []),
1213 ('recvfrom_into', _recvfrom_into
, False, []),
1215 data_prefix
= u
"PREFIX_"
1217 for meth_name
, send_meth
, expect_success
, args
in send_methods
:
1218 indata
= data_prefix
+ meth_name
1220 send_meth(indata
.encode('ASCII', 'strict'), *args
)
1222 outdata
= outdata
.decode('ASCII', 'strict')
1223 if outdata
!= indata
.lower():
1225 "While sending with <<%s>> bad data "
1226 "<<%r>> (%d) received; "
1227 "expected <<%r>> (%d)\n" % (
1228 meth_name
, outdata
[:20], len(outdata
),
1229 indata
[:20], len(indata
)
1232 except ValueError as e
:
1235 "Failed to send with method <<%s>>; "
1236 "expected to succeed.\n" % (meth_name
,)
1238 if not str(e
).startswith(meth_name
):
1240 "Method <<%s>> failed with unexpected "
1241 "exception message: %s\n" % (
1246 for meth_name
, recv_meth
, expect_success
, args
in recv_methods
:
1247 indata
= data_prefix
+ meth_name
1249 s
.send(indata
.encode('ASCII', 'strict'))
1250 outdata
= recv_meth(*args
)
1251 outdata
= outdata
.decode('ASCII', 'strict')
1252 if outdata
!= indata
.lower():
1254 "While receiving with <<%s>> bad data "
1255 "<<%r>> (%d) received; "
1256 "expected <<%r>> (%d)\n" % (
1257 meth_name
, outdata
[:20], len(outdata
),
1258 indata
[:20], len(indata
)
1261 except ValueError as e
:
1264 "Failed to receive with method <<%s>>; "
1265 "expected to succeed.\n" % (meth_name
,)
1267 if not str(e
).startswith(meth_name
):
1269 "Method <<%s>> failed with unexpected "
1270 "exception message: %s\n" % (
1277 s
.write("over\n".encode("ASCII", "strict"))
1283 def test_handshake_timeout(self
):
1284 # Issue #5103: SSL handshake must respect the socket timeout
1285 server
= socket
.socket(socket
.AF_INET
)
1287 port
= test_support
.bind_port(server
)
1288 started
= threading
.Event()
1296 r
, w
, e
= select
.select([server
], [], [], 0.1)
1298 # Let the socket hang around rather than having
1299 # it closed by garbage collection.
1300 conns
.append(server
.accept()[0])
1302 t
= threading
.Thread(target
=serve
)
1308 c
= socket
.socket(socket
.AF_INET
)
1310 c
.connect((host
, port
))
1311 # Will attempt handshake and time out
1312 self
.assertRaisesRegexp(ssl
.SSLError
, "timed out",
1317 c
= socket
.socket(socket
.AF_INET
)
1319 c
= ssl
.wrap_socket(c
)
1320 # Will attempt handshake and time out
1321 self
.assertRaisesRegexp(ssl
.SSLError
, "timed out",
1322 c
.connect
, (host
, port
))
1331 def test_main(verbose
=False):
1332 global CERTFILE
, SVN_PYTHON_ORG_ROOT_CERT
1333 CERTFILE
= os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
1335 SVN_PYTHON_ORG_ROOT_CERT
= os
.path
.join(
1336 os
.path
.dirname(__file__
) or os
.curdir
,
1337 "https_svn_python_org_root.pem")
1339 if (not os
.path
.exists(CERTFILE
) or
1340 not os
.path
.exists(SVN_PYTHON_ORG_ROOT_CERT
)):
1341 raise test_support
.TestFailed("Can't read certificate files!")
1343 tests
= [BasicTests
, BasicSocketTests
]
1345 if test_support
.is_resource_enabled('network'):
1346 tests
.append(NetworkedTests
)
1349 thread_info
= test_support
.threading_setup()
1350 if thread_info
and test_support
.is_resource_enabled('network'):
1351 tests
.append(ThreadedTests
)
1354 test_support
.run_unittest(*tests
)
1357 test_support
.threading_cleanup(*thread_info
)
1359 if __name__
== "__main__":