Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(85503)

Delta Between Two Patch Sets: Lib/test/test_ssl.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 6 years, 9 months ago
Right Patch Set: Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_socket.py ('k') | Lib/test/test_strptime.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # Test the support for SSL and sockets 1 # Test the support for SSL and sockets
2 2
3 import sys 3 import sys
4 import unittest 4 import unittest
5 from test import support 5 from test import support
6 import socket 6 import socket
7 import select 7 import select
8 import time 8 import time
9 import datetime
9 import gc 10 import gc
10 import os 11 import os
11 import errno 12 import errno
12 import pprint 13 import pprint
13 import tempfile 14 import tempfile
14 import urllib.request 15 import urllib.request
15 import traceback 16 import traceback
16 import asyncore 17 import asyncore
17 import weakref 18 import weakref
18 import platform 19 import platform
19 import functools 20 import functools
21 from unittest import mock
20 22
21 ssl = support.import_module("ssl") 23 ssl = support.import_module("ssl")
22 24
23 PROTOCOLS = [ 25 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
24 ssl.PROTOCOL_SSLv3,
25 ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1
26 ]
27 if hasattr(ssl, 'PROTOCOL_SSLv2'):
28 PROTOCOLS.append(ssl.PROTOCOL_SSLv2)
29
30 HOST = support.HOST 26 HOST = support.HOST
31 27
32 data_file = lambda name: os.path.join(os.path.dirname(__file__), name) 28 def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
33 30
34 # The custom key and certificate files used in test_ssl are generated 31 # The custom key and certificate files used in test_ssl are generated
35 # using Lib/test/make_ssl_certs.py. 32 # using Lib/test/make_ssl_certs.py.
36 # Other certificates are simply fetched from the Internet servers they 33 # Other certificates are simply fetched from the Internet servers they
37 # are meant to authenticate. 34 # are meant to authenticate.
38 35
39 CERTFILE = data_file("keycert.pem") 36 CERTFILE = data_file("keycert.pem")
40 BYTES_CERTFILE = os.fsencode(CERTFILE) 37 BYTES_CERTFILE = os.fsencode(CERTFILE)
41 ONLYCERT = data_file("ssl_cert.pem") 38 ONLYCERT = data_file("ssl_cert.pem")
42 ONLYKEY = data_file("ssl_key.pem") 39 ONLYKEY = data_file("ssl_key.pem")
43 BYTES_ONLYCERT = os.fsencode(ONLYCERT) 40 BYTES_ONLYCERT = os.fsencode(ONLYCERT)
44 BYTES_ONLYKEY = os.fsencode(ONLYKEY) 41 BYTES_ONLYKEY = os.fsencode(ONLYKEY)
45 CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 42 CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
46 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 43 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
47 KEY_PASSWORD = "somepass" 44 KEY_PASSWORD = "somepass"
48 CAPATH = data_file("capath") 45 CAPATH = data_file("capath")
49 BYTES_CAPATH = os.fsencode(CAPATH) 46 BYTES_CAPATH = os.fsencode(CAPATH)
47 CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48 CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
50
51 # empty CRL
52 CRLFILE = data_file("revocation.crl")
53
54 # Two keys and certs signed by the same CA (for SNI tests)
55 SIGNED_CERTFILE = data_file("keycert3.pem")
56 SIGNED_CERTFILE2 = data_file("keycert4.pem")
57 SIGNING_CA = data_file("pycacert.pem")
50 58
51 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") 59 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
52 60
53 EMPTYCERT = data_file("nullcert.pem") 61 EMPTYCERT = data_file("nullcert.pem")
54 BADCERT = data_file("badcert.pem") 62 BADCERT = data_file("badcert.pem")
55 WRONGCERT = data_file("XXXnonexisting.pem") 63 WRONGCERT = data_file("XXXnonexisting.pem")
56 BADKEY = data_file("badkey.pem") 64 BADKEY = data_file("badkey.pem")
57 NOKIACERT = data_file("nokia.pem") 65 NOKIACERT = data_file("nokia.pem")
66 NULLBYTECERT = data_file("nullbytecert.pem")
58 67
59 DHFILE = data_file("dh512.pem") 68 DHFILE = data_file("dh512.pem")
60 BYTES_DHFILE = os.fsencode(DHFILE) 69 BYTES_DHFILE = os.fsencode(DHFILE)
70
61 71
62 def handle_error(prefix): 72 def handle_error(prefix):
63 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
64 if support.verbose: 74 if support.verbose:
65 sys.stdout.write(prefix + exc_format) 75 sys.stdout.write(prefix + exc_format)
66 76
67 def can_clear_options(): 77 def can_clear_options():
68 # 0.9.8m or higher 78 # 0.9.8m or higher
69 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 79 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
70 80
71 def no_sslv2_implies_sslv3_hello(): 81 def no_sslv2_implies_sslv3_hello():
72 # 0.9.7h or higher 82 # 0.9.7h or higher
73 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
74 84
85 def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
89 def asn1time(cert_time):
90 # Some versions of OpenSSL ignore seconds, see #18207
91 # 0.9.8.i
92 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
93 fmt = "%b %d %H:%M:%S %Y GMT"
94 dt = datetime.datetime.strptime(cert_time, fmt)
95 dt = dt.replace(second=0)
96 cert_time = dt.strftime(fmt)
97 # %d adds leading zero but ASN1_TIME_print() uses leading space
98 if cert_time[4] == "0":
99 cert_time = cert_time[:4] + " " + cert_time[5:]
100
101 return cert_time
75 102
76 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 103 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
77 def skip_if_broken_ubuntu_ssl(func): 104 def skip_if_broken_ubuntu_ssl(func):
78 if hasattr(ssl, 'PROTOCOL_SSLv2'): 105 if hasattr(ssl, 'PROTOCOL_SSLv2'):
79 @functools.wraps(func) 106 @functools.wraps(func)
80 def f(*args, **kwargs): 107 def f(*args, **kwargs):
81 try: 108 try:
82 ssl.SSLContext(ssl.PROTOCOL_SSLv2) 109 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
83 except ssl.SSLError: 110 except ssl.SSLError:
84 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 111 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
85 platform.linux_distribution() == ('debian', 'squeeze/sid', ' ')): 112 platform.linux_distribution() == ('debian', 'squeeze/sid', ' ')):
86 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behav iour") 113 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behav iour")
87 return func(*args, **kwargs) 114 return func(*args, **kwargs)
88 return f 115 return f
89 else: 116 else:
90 return func 117 return func
91 118
119 needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
120
92 121
93 class BasicSocketTests(unittest.TestCase): 122 class BasicSocketTests(unittest.TestCase):
94 123
95 def test_constants(self): 124 def test_constants(self):
96 #ssl.PROTOCOL_SSLv2
97 ssl.PROTOCOL_SSLv23
98 ssl.PROTOCOL_SSLv3
99 ssl.PROTOCOL_TLSv1
100 ssl.CERT_NONE 125 ssl.CERT_NONE
101 ssl.CERT_OPTIONAL 126 ssl.CERT_OPTIONAL
102 ssl.CERT_REQUIRED 127 ssl.CERT_REQUIRED
103 ssl.OP_CIPHER_SERVER_PREFERENCE 128 ssl.OP_CIPHER_SERVER_PREFERENCE
104 ssl.OP_SINGLE_DH_USE 129 ssl.OP_SINGLE_DH_USE
105 if ssl.HAS_ECDH: 130 if ssl.HAS_ECDH:
106 ssl.OP_SINGLE_ECDH_USE 131 ssl.OP_SINGLE_ECDH_USE
107 if ssl.OPENSSL_VERSION_INFO >= (1, 0): 132 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
108 ssl.OP_NO_COMPRESSION 133 ssl.OP_NO_COMPRESSION
109 self.assertIn(ssl.HAS_SNI, {True, False}) 134 self.assertIn(ssl.HAS_SNI, {True, False})
110 self.assertIn(ssl.HAS_ECDH, {True, False}) 135 self.assertIn(ssl.HAS_ECDH, {True, False})
111 136
112 def test_random(self): 137 def test_random(self):
113 v = ssl.RAND_status() 138 v = ssl.RAND_status()
114 if support.verbose: 139 if support.verbose:
115 sys.stdout.write("\n RAND_status is %d (%s)\n" 140 sys.stdout.write("\n RAND_status is %d (%s)\n"
116 % (v, (v and "sufficient randomness") or 141 % (v, (v and "sufficient randomness") or
117 "insufficient randomness")) 142 "insufficient randomness"))
118 143
119 data, is_cryptographic = ssl.RAND_pseudo_bytes(16) 144 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
120 self.assertEqual(len(data), 16) 145 self.assertEqual(len(data), 16)
121 self.assertEqual(is_cryptographic, v == 1) 146 self.assertEqual(is_cryptographic, v == 1)
122 if v: 147 if v:
123 data = ssl.RAND_bytes(16) 148 data = ssl.RAND_bytes(16)
124 self.assertEqual(len(data), 16) 149 self.assertEqual(len(data), 16)
125 else: 150 else:
126 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16) 151 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
127 152
153 # negative num is invalid
154 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
155 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
156
128 self.assertRaises(TypeError, ssl.RAND_egd, 1) 157 self.assertRaises(TypeError, ssl.RAND_egd, 1)
129 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 158 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
130 ssl.RAND_add("this is a random string", 75.0) 159 ssl.RAND_add("this is a random string", 75.0)
160
161 @unittest.skipUnless(os.name == 'posix', 'requires posix')
162 def test_random_fork(self):
163 status = ssl.RAND_status()
164 if not status:
165 self.fail("OpenSSL's PRNG has insufficient randomness")
166
167 rfd, wfd = os.pipe()
168 pid = os.fork()
169 if pid == 0:
170 try:
171 os.close(rfd)
172 child_random = ssl.RAND_pseudo_bytes(16)[0]
173 self.assertEqual(len(child_random), 16)
174 os.write(wfd, child_random)
175 os.close(wfd)
176 except BaseException:
177 os._exit(1)
178 else:
179 os._exit(0)
180 else:
181 os.close(wfd)
182 self.addCleanup(os.close, rfd)
183 _, status = os.waitpid(pid, 0)
184 self.assertEqual(status, 0)
185
186 child_random = os.read(rfd, 16)
187 self.assertEqual(len(child_random), 16)
188 parent_random = ssl.RAND_pseudo_bytes(16)[0]
189 self.assertEqual(len(parent_random), 16)
190
191 self.assertNotEqual(child_random, parent_random)
131 192
132 def test_parse_cert(self): 193 def test_parse_cert(self):
133 # note that this uses an 'unofficial' function in _ssl.c, 194 # note that this uses an 'unofficial' function in _ssl.c,
134 # provided solely for this test, to exercise the certificate 195 # provided solely for this test, to exercise the certificate
135 # parsing code 196 # parsing code
136 p = ssl._ssl._test_decode_cert(CERTFILE) 197 p = ssl._ssl._test_decode_cert(CERTFILE)
137 if support.verbose: 198 if support.verbose:
138 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 199 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
139 self.assertEqual(p['issuer'], 200 self.assertEqual(p['issuer'],
140 ((('countryName', 'XY'),), 201 ((('countryName', 'XY'),),
141 (('localityName', 'Castle Anthrax'),), 202 (('localityName', 'Castle Anthrax'),),
142 (('organizationName', 'Python Software Foundation'),), 203 (('organizationName', 'Python Software Foundation'),),
143 (('commonName', 'localhost'),)) 204 (('commonName', 'localhost'),))
144 ) 205 )
145 self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT') 206 # Note the next three asserts will fail if the keys are regenerated
146 self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT') 207 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
208 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
147 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') 209 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
148 self.assertEqual(p['subject'], 210 self.assertEqual(p['subject'],
149 ((('countryName', 'XY'),), 211 ((('countryName', 'XY'),),
150 (('localityName', 'Castle Anthrax'),), 212 (('localityName', 'Castle Anthrax'),),
151 (('organizationName', 'Python Software Foundation'),), 213 (('organizationName', 'Python Software Foundation'),),
152 (('commonName', 'localhost'),)) 214 (('commonName', 'localhost'),))
153 ) 215 )
154 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 216 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
155 # Issue #13034: the subjectAltName in some certificates 217 # Issue #13034: the subjectAltName in some certificates
156 # (notably projects.developer.nokia.com:443) wasn't parsed 218 # (notably projects.developer.nokia.com:443) wasn't parsed
157 p = ssl._ssl._test_decode_cert(NOKIACERT) 219 p = ssl._ssl._test_decode_cert(NOKIACERT)
158 if support.verbose: 220 if support.verbose:
159 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 221 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
160 self.assertEqual(p['subjectAltName'], 222 self.assertEqual(p['subjectAltName'],
161 (('DNS', 'projects.developer.nokia.com'), 223 (('DNS', 'projects.developer.nokia.com'),
162 ('DNS', 'projects.forum.nokia.com')) 224 ('DNS', 'projects.forum.nokia.com'))
163 ) 225 )
226 # extra OCSP and AIA fields
227 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
228 self.assertEqual(p['caIssuers'],
229 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
230 self.assertEqual(p['crlDistributionPoints'],
231 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
232
233 def test_parse_cert_CVE_2013_4238(self):
234 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
235 if support.verbose:
236 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
237 subject = ((('countryName', 'US'),),
238 (('stateOrProvinceName', 'Oregon'),),
239 (('localityName', 'Beaverton'),),
240 (('organizationName', 'Python Software Foundation'),),
241 (('organizationalUnitName', 'Python Core Development'),),
242 (('commonName', 'null.python.org\x00example.org'),),
243 (('emailAddress', 'python-dev@python.org'),))
244 self.assertEqual(p['subject'], subject)
245 self.assertEqual(p['issuer'], subject)
246 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
247 san = (('DNS', 'altnull.python.org\x00example.com'),
248 ('email', 'null@python.org\x00user@example.org'),
249 ('URI', 'http://null.python.org\x00http://example.org'),
250 ('IP Address', '192.0.2.1'),
251 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
252 else:
253 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
254 san = (('DNS', 'altnull.python.org\x00example.com'),
255 ('email', 'null@python.org\x00user@example.org'),
256 ('URI', 'http://null.python.org\x00http://example.org'),
257 ('IP Address', '192.0.2.1'),
258 ('IP Address', '<invalid>'))
259
260 self.assertEqual(p['subjectAltName'], san)
164 261
165 def test_DER_to_PEM(self): 262 def test_DER_to_PEM(self):
166 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: 263 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
167 pem = f.read() 264 pem = f.read()
168 d1 = ssl.PEM_cert_to_DER_cert(pem) 265 d1 = ssl.PEM_cert_to_DER_cert(pem)
169 p2 = ssl.DER_cert_to_PEM_cert(d1) 266 p2 = ssl.DER_cert_to_PEM_cert(d1)
170 d2 = ssl.PEM_cert_to_DER_cert(p2) 267 d2 = ssl.PEM_cert_to_DER_cert(p2)
171 self.assertEqual(d1, d2) 268 self.assertEqual(d1, d2)
172 if not p2.startswith(ssl.PEM_HEADER + '\n'): 269 if not p2.startswith(ssl.PEM_HEADER + '\n'):
173 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 270 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
(...skipping 27 matching lines...) Expand all
201 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, mino r, fix)), 298 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, mino r, fix)),
202 (s, t)) 299 (s, t))
203 300
204 @support.cpython_only 301 @support.cpython_only
205 def test_refcycle(self): 302 def test_refcycle(self):
206 # Issue #7943: an SSL object doesn't create reference cycles with 303 # Issue #7943: an SSL object doesn't create reference cycles with
207 # itself. 304 # itself.
208 s = socket.socket(socket.AF_INET) 305 s = socket.socket(socket.AF_INET)
209 ss = ssl.wrap_socket(s) 306 ss = ssl.wrap_socket(s)
210 wr = weakref.ref(ss) 307 wr = weakref.ref(ss)
211 del ss 308 with support.check_warnings(("", ResourceWarning)):
212 self.assertEqual(wr(), None) 309 del ss
310 self.assertEqual(wr(), None)
213 311
214 def test_wrapped_unconnected(self): 312 def test_wrapped_unconnected(self):
215 # Methods on an unconnected SSLSocket propagate the original 313 # Methods on an unconnected SSLSocket propagate the original
216 # socket.error raise by the underlying socket object. 314 # OSError raise by the underlying socket object.
217 s = socket.socket(socket.AF_INET) 315 s = socket.socket(socket.AF_INET)
218 ss = ssl.wrap_socket(s) 316 with ssl.wrap_socket(s) as ss:
219 self.assertRaises(socket.error, ss.recv, 1) 317 self.assertRaises(OSError, ss.recv, 1)
220 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 318 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
221 self.assertRaises(socket.error, ss.recvfrom, 1) 319 self.assertRaises(OSError, ss.recvfrom, 1)
222 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 320 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
223 self.assertRaises(socket.error, ss.send, b'x') 321 self.assertRaises(OSError, ss.send, b'x')
224 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 322 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
225 323
226 def test_timeout(self): 324 def test_timeout(self):
227 # Issue #8524: when creating an SSL socket, the timeout of the 325 # Issue #8524: when creating an SSL socket, the timeout of the
228 # original socket should be retained. 326 # original socket should be retained.
229 for timeout in (None, 0.0, 5.0): 327 for timeout in (None, 0.0, 5.0):
230 s = socket.socket(socket.AF_INET) 328 s = socket.socket(socket.AF_INET)
231 s.settimeout(timeout) 329 s.settimeout(timeout)
232 ss = ssl.wrap_socket(s) 330 with ssl.wrap_socket(s) as ss:
233 self.assertEqual(timeout, ss.gettimeout()) 331 self.assertEqual(timeout, ss.gettimeout())
234 332
235 def test_errors(self): 333 def test_errors(self):
236 sock = socket.socket() 334 sock = socket.socket()
237 self.assertRaisesRegex(ValueError, 335 self.assertRaisesRegex(ValueError,
238 "certfile must be specified", 336 "certfile must be specified",
239 ssl.wrap_socket, sock, keyfile=CERTFILE) 337 ssl.wrap_socket, sock, keyfile=CERTFILE)
240 self.assertRaisesRegex(ValueError, 338 self.assertRaisesRegex(ValueError,
241 "certfile must be specified for server-side operations", 339 "certfile must be specified for server-side operations",
242 ssl.wrap_socket, sock, server_side=True) 340 ssl.wrap_socket, sock, server_side=True)
243 self.assertRaisesRegex(ValueError, 341 self.assertRaisesRegex(ValueError,
244 "certfile must be specified for server-side operations", 342 "certfile must be specified for server-side operations",
245 ssl.wrap_socket, sock, server_side=True, certfile="") 343 ssl.wrap_socket, sock, server_side=True, certfile="")
246 s = ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) 344 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
247 self.assertRaisesRegex(ValueError, "can't connect in server-side mode", 345 self.assertRaisesRegex(ValueError, "can't connect in server-side mod e",
248 s.connect, (HOST, 8080)) 346 s.connect, (HOST, 8080))
249 with self.assertRaises(IOError) as cm: 347 with self.assertRaises(OSError) as cm:
250 with socket.socket() as sock: 348 with socket.socket() as sock:
251 ssl.wrap_socket(sock, certfile=WRONGCERT) 349 ssl.wrap_socket(sock, certfile=WRONGCERT)
252 self.assertEqual(cm.exception.errno, errno.ENOENT) 350 self.assertEqual(cm.exception.errno, errno.ENOENT)
253 with self.assertRaises(IOError) as cm: 351 with self.assertRaises(OSError) as cm:
254 with socket.socket() as sock: 352 with socket.socket() as sock:
255 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT) 353 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
256 self.assertEqual(cm.exception.errno, errno.ENOENT) 354 self.assertEqual(cm.exception.errno, errno.ENOENT)
257 with self.assertRaises(IOError) as cm: 355 with self.assertRaises(OSError) as cm:
258 with socket.socket() as sock: 356 with socket.socket() as sock:
259 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT) 357 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
260 self.assertEqual(cm.exception.errno, errno.ENOENT) 358 self.assertEqual(cm.exception.errno, errno.ENOENT)
261 359
262 def test_match_hostname(self): 360 def test_match_hostname(self):
263 def ok(cert, hostname): 361 def ok(cert, hostname):
264 ssl.match_hostname(cert, hostname) 362 ssl.match_hostname(cert, hostname)
265 def fail(cert, hostname): 363 def fail(cert, hostname):
266 self.assertRaises(ssl.CertificateError, 364 self.assertRaises(ssl.CertificateError,
267 ssl.match_hostname, cert, hostname) 365 ssl.match_hostname, cert, hostname)
268 366
269 cert = {'subject': ((('commonName', 'example.com'),),)} 367 cert = {'subject': ((('commonName', 'example.com'),),)}
270 ok(cert, 'example.com') 368 ok(cert, 'example.com')
271 ok(cert, 'ExAmple.cOm') 369 ok(cert, 'ExAmple.cOm')
272 fail(cert, 'www.example.com') 370 fail(cert, 'www.example.com')
273 fail(cert, '.example.com') 371 fail(cert, '.example.com')
274 fail(cert, 'example.org') 372 fail(cert, 'example.org')
275 fail(cert, 'exampleXcom') 373 fail(cert, 'exampleXcom')
276 374
277 cert = {'subject': ((('commonName', '*.a.com'),),)} 375 cert = {'subject': ((('commonName', '*.a.com'),),)}
278 ok(cert, 'foo.a.com') 376 ok(cert, 'foo.a.com')
279 fail(cert, 'bar.foo.a.com') 377 fail(cert, 'bar.foo.a.com')
280 fail(cert, 'a.com') 378 fail(cert, 'a.com')
281 fail(cert, 'Xa.com') 379 fail(cert, 'Xa.com')
282 fail(cert, '.a.com') 380 fail(cert, '.a.com')
283 381
284 cert = {'subject': ((('commonName', 'a.*.com'),),)} 382 # only match one left-most wildcard
285 ok(cert, 'a.foo.com')
286 fail(cert, 'a..com')
287 fail(cert, 'a.com')
288
289 cert = {'subject': ((('commonName', 'f*.com'),),)} 383 cert = {'subject': ((('commonName', 'f*.com'),),)}
290 ok(cert, 'foo.com') 384 ok(cert, 'foo.com')
291 ok(cert, 'f.com') 385 ok(cert, 'f.com')
292 fail(cert, 'bar.com') 386 fail(cert, 'bar.com')
293 fail(cert, 'foo.a.com') 387 fail(cert, 'foo.a.com')
294 fail(cert, 'bar.foo.com') 388 fail(cert, 'bar.foo.com')
389
390 # NULL bytes are bad, CVE-2013-4073
391 cert = {'subject': ((('commonName',
392 'null.python.org\x00example.org'),),)}
393 ok(cert, 'null.python.org\x00example.org') # or raise an error?
394 fail(cert, 'example.org')
395 fail(cert, 'null.python.org')
396
397 # error cases with wildcards
398 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
399 fail(cert, 'bar.foo.a.com')
400 fail(cert, 'a.com')
401 fail(cert, 'Xa.com')
402 fail(cert, '.a.com')
403
404 cert = {'subject': ((('commonName', 'a.*.com'),),)}
405 fail(cert, 'a.foo.com')
406 fail(cert, 'a..com')
407 fail(cert, 'a.com')
408
409 # wildcard doesn't match IDNA prefix 'xn--'
410 idna = 'püthon.python.org'.encode("idna").decode("ascii")
411 cert = {'subject': ((('commonName', idna),),)}
412 ok(cert, idna)
413 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
414 fail(cert, idna)
415 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
416 fail(cert, idna)
417
418 # wildcard in first fragment and IDNA A-labels in sequent fragments
419 # are supported.
420 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
421 cert = {'subject': ((('commonName', idna),),)}
422 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
423 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
424 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
425 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
295 426
296 # Slightly fake real-world example 427 # Slightly fake real-world example
297 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', 428 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
298 'subject': ((('commonName', 'linuxfrz.org'),),), 429 'subject': ((('commonName', 'linuxfrz.org'),),),
299 'subjectAltName': (('DNS', 'linuxfr.org'), 430 'subjectAltName': (('DNS', 'linuxfr.org'),
300 ('DNS', 'linuxfr.com'), 431 ('DNS', 'linuxfr.com'),
301 ('othername', '<unsupported>'))} 432 ('othername', '<unsupported>'))}
302 ok(cert, 'linuxfr.org') 433 ok(cert, 'linuxfr.org')
303 ok(cert, 'linuxfr.com') 434 ok(cert, 'linuxfr.com')
304 # Not a "DNS" entry 435 # Not a "DNS" entry
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
341 (('stateOrProvinceName', 'California'),), 472 (('stateOrProvinceName', 'California'),),
342 (('localityName', 'Mountain View'),), 473 (('localityName', 'Mountain View'),),
343 (('organizationName', 'Google Inc'),)), 474 (('organizationName', 'Google Inc'),)),
344 'subjectAltName': (('othername', 'blabla'),)} 475 'subjectAltName': (('othername', 'blabla'),)}
345 fail(cert, 'google.com') 476 fail(cert, 'google.com')
346 477
347 # Empty cert / no cert 478 # Empty cert / no cert
348 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') 479 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
349 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') 480 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
350 481
482 # Issue #17980: avoid denials of service by refusing more than one
483 # wildcard per fragment.
484 cert = {'subject': ((('commonName', 'a*b.com'),),)}
485 ok(cert, 'axxb.com')
486 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
487 fail(cert, 'axxb.com')
488 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
489 with self.assertRaises(ssl.CertificateError) as cm:
490 ssl.match_hostname(cert, 'axxbxxc.com')
491 self.assertIn("too many wildcards", str(cm.exception))
492
351 def test_server_side(self): 493 def test_server_side(self):
352 # server_hostname doesn't work for server sockets 494 # server_hostname doesn't work for server sockets
353 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 495 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
354 with socket.socket() as sock: 496 with socket.socket() as sock:
355 self.assertRaises(ValueError, ctx.wrap_socket, sock, True, 497 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
356 server_hostname="some.hostname") 498 server_hostname="some.hostname")
357 499
358 def test_unknown_channel_binding(self): 500 def test_unknown_channel_binding(self):
359 # should raise ValueError for unknown type 501 # should raise ValueError for unknown type
360 s = socket.socket(socket.AF_INET) 502 s = socket.socket(socket.AF_INET)
361 ss = ssl.wrap_socket(s) 503 with ssl.wrap_socket(s) as ss:
362 with self.assertRaises(ValueError): 504 with self.assertRaises(ValueError):
363 ss.get_channel_binding("unknown-type") 505 ss.get_channel_binding("unknown-type")
364 506
365 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 507 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
366 "'tls-unique' channel binding not available") 508 "'tls-unique' channel binding not available")
367 def test_tls_unique_channel_binding(self): 509 def test_tls_unique_channel_binding(self):
368 # unconnected should return None for known type 510 # unconnected should return None for known type
369 s = socket.socket(socket.AF_INET) 511 s = socket.socket(socket.AF_INET)
370 ss = ssl.wrap_socket(s) 512 with ssl.wrap_socket(s) as ss:
371 self.assertIsNone(ss.get_channel_binding("tls-unique")) 513 self.assertIsNone(ss.get_channel_binding("tls-unique"))
372 # the same for server-side 514 # the same for server-side
373 s = socket.socket(socket.AF_INET) 515 s = socket.socket(socket.AF_INET)
374 ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) 516 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
375 self.assertIsNone(ss.get_channel_binding("tls-unique")) 517 self.assertIsNone(ss.get_channel_binding("tls-unique"))
518
519 def test_dealloc_warn(self):
520 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
521 r = repr(ss)
522 with self.assertWarns(ResourceWarning) as cm:
523 ss = None
524 support.gc_collect()
525 self.assertIn(r, str(cm.warning.args[0]))
526
527 def test_get_default_verify_paths(self):
528 paths = ssl.get_default_verify_paths()
529 self.assertEqual(len(paths), 6)
530 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
531
532 with support.EnvironmentVarGuard() as env:
533 env["SSL_CERT_DIR"] = CAPATH
534 env["SSL_CERT_FILE"] = CERTFILE
535 paths = ssl.get_default_verify_paths()
536 self.assertEqual(paths.cafile, CERTFILE)
537 self.assertEqual(paths.capath, CAPATH)
538
539 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
540 def test_enum_certificates(self):
541 self.assertTrue(ssl.enum_certificates("CA"))
542 self.assertTrue(ssl.enum_certificates("ROOT"))
543
544 self.assertRaises(TypeError, ssl.enum_certificates)
545 self.assertRaises(WindowsError, ssl.enum_certificates, "")
546
547 trust_oids = set()
548 for storename in ("CA", "ROOT"):
549 store = ssl.enum_certificates(storename)
550 self.assertIsInstance(store, list)
551 for element in store:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 3)
554 cert, enc, trust = element
555 self.assertIsInstance(cert, bytes)
556 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
557 self.assertIsInstance(trust, (set, bool))
558 if isinstance(trust, set):
559 trust_oids.update(trust)
560
561 serverAuth = "1.3.6.1.5.5.7.3.1"
562 self.assertIn(serverAuth, trust_oids)
563
564 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
565 def test_enum_crls(self):
566 self.assertTrue(ssl.enum_crls("CA"))
567 self.assertRaises(TypeError, ssl.enum_crls)
568 self.assertRaises(WindowsError, ssl.enum_crls, "")
569
570 crls = ssl.enum_crls("CA")
571 self.assertIsInstance(crls, list)
572 for element in crls:
573 self.assertIsInstance(element, tuple)
574 self.assertEqual(len(element), 2)
575 self.assertIsInstance(element[0], bytes)
576 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
577
578
579 def test_asn1object(self):
580 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
581 '1.3.6.1.5.5.7.3.1')
582
583 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
584 self.assertEqual(val, expected)
585 self.assertEqual(val.nid, 129)
586 self.assertEqual(val.shortname, 'serverAuth')
587 self.assertEqual(val.longname, 'TLS Web Server Authentication')
588 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
589 self.assertIsInstance(val, ssl._ASN1Object)
590 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
591
592 val = ssl._ASN1Object.fromnid(129)
593 self.assertEqual(val, expected)
594 self.assertIsInstance(val, ssl._ASN1Object)
595 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
596 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
597 ssl._ASN1Object.fromnid(100000)
598 for i in range(1000):
599 try:
600 obj = ssl._ASN1Object.fromnid(i)
601 except ValueError:
602 pass
603 else:
604 self.assertIsInstance(obj.nid, int)
605 self.assertIsInstance(obj.shortname, str)
606 self.assertIsInstance(obj.longname, str)
607 self.assertIsInstance(obj.oid, (str, type(None)))
608
609 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
610 self.assertEqual(val, expected)
611 self.assertIsInstance(val, ssl._ASN1Object)
612 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
613 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
614 expected)
615 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
616 ssl._ASN1Object.fromname('serverauth')
617
618 def test_purpose_enum(self):
619 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
620 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
621 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
622 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
623 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
624 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
625 '1.3.6.1.5.5.7.3.1')
626
627 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
628 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
629 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
630 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
631 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
632 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
633 '1.3.6.1.5.5.7.3.2')
634
635 def test_unsupported_dtls(self):
636 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
637 self.addCleanup(s.close)
638 with self.assertRaises(NotImplementedError) as cx:
639 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
640 self.assertEqual(str(cx.exception), "only stream sockets are supported")
641 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
642 with self.assertRaises(NotImplementedError) as cx:
643 ctx.wrap_socket(s)
644 self.assertEqual(str(cx.exception), "only stream sockets are supported")
645
376 646
377 class ContextTests(unittest.TestCase): 647 class ContextTests(unittest.TestCase):
378 648
379 @skip_if_broken_ubuntu_ssl 649 @skip_if_broken_ubuntu_ssl
380 def test_constructor(self): 650 def test_constructor(self):
381 if hasattr(ssl, 'PROTOCOL_SSLv2'): 651 for protocol in PROTOCOLS:
382 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2) 652 ssl.SSLContext(protocol)
383 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
384 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3)
385 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
386 self.assertRaises(TypeError, ssl.SSLContext) 653 self.assertRaises(TypeError, ssl.SSLContext)
387 self.assertRaises(ValueError, ssl.SSLContext, -1) 654 self.assertRaises(ValueError, ssl.SSLContext, -1)
388 self.assertRaises(ValueError, ssl.SSLContext, 42) 655 self.assertRaises(ValueError, ssl.SSLContext, 42)
389 656
390 @skip_if_broken_ubuntu_ssl 657 @skip_if_broken_ubuntu_ssl
391 def test_protocol(self): 658 def test_protocol(self):
392 for proto in PROTOCOLS: 659 for proto in PROTOCOLS:
393 ctx = ssl.SSLContext(proto) 660 ctx = ssl.SSLContext(proto)
394 self.assertEqual(ctx.protocol, proto) 661 self.assertEqual(ctx.protocol, proto)
395 662
396 def test_ciphers(self): 663 def test_ciphers(self):
397 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 664 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
398 ctx.set_ciphers("ALL") 665 ctx.set_ciphers("ALL")
399 ctx.set_ciphers("DEFAULT") 666 ctx.set_ciphers("DEFAULT")
400 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): 667 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
401 ctx.set_ciphers("^$:,;?*'dorothyx") 668 ctx.set_ciphers("^$:,;?*'dorothyx")
402 669
403 @skip_if_broken_ubuntu_ssl 670 @skip_if_broken_ubuntu_ssl
404 def test_options(self): 671 def test_options(self):
405 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 672 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
406 # OP_ALL is the default value 673 # OP_ALL | OP_NO_SSLv2 is the default value
407 self.assertEqual(ssl.OP_ALL, ctx.options)
408 ctx.options |= ssl.OP_NO_SSLv2
409 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2, 674 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
410 ctx.options) 675 ctx.options)
411 ctx.options |= ssl.OP_NO_SSLv3 676 ctx.options |= ssl.OP_NO_SSLv3
412 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3, 677 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
413 ctx.options) 678 ctx.options)
414 if can_clear_options(): 679 if can_clear_options():
415 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1 680 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
416 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3, 681 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
417 ctx.options) 682 ctx.options)
418 ctx.options = 0 683 ctx.options = 0
419 self.assertEqual(0, ctx.options) 684 self.assertEqual(0, ctx.options)
420 else: 685 else:
421 with self.assertRaises(ValueError): 686 with self.assertRaises(ValueError):
422 ctx.options = 0 687 ctx.options = 0
423 688
424 def test_verify(self): 689 def test_verify_mode(self):
425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 690 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
426 # Default value 691 # Default value
427 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 692 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
428 ctx.verify_mode = ssl.CERT_OPTIONAL 693 ctx.verify_mode = ssl.CERT_OPTIONAL
429 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 694 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
430 ctx.verify_mode = ssl.CERT_REQUIRED 695 ctx.verify_mode = ssl.CERT_REQUIRED
431 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 696 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
432 ctx.verify_mode = ssl.CERT_NONE 697 ctx.verify_mode = ssl.CERT_NONE
433 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 698 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
434 with self.assertRaises(TypeError): 699 with self.assertRaises(TypeError):
435 ctx.verify_mode = None 700 ctx.verify_mode = None
436 with self.assertRaises(ValueError): 701 with self.assertRaises(ValueError):
437 ctx.verify_mode = 42 702 ctx.verify_mode = 42
438 703
704 @unittest.skipUnless(have_verify_flags(),
705 "verify_flags need OpenSSL > 0.9.8")
706 def test_verify_flags(self):
707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
708 # default value by OpenSSL
709 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
710 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
711 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
712 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
713 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
714 ctx.verify_flags = ssl.VERIFY_DEFAULT
715 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
716 # supports any value
717 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
718 self.assertEqual(ctx.verify_flags,
719 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
720 with self.assertRaises(TypeError):
721 ctx.verify_flags = None
722
439 def test_load_cert_chain(self): 723 def test_load_cert_chain(self):
440 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 724 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
441 # Combined key and cert in a single file 725 # Combined key and cert in a single file
442 ctx.load_cert_chain(CERTFILE) 726 ctx.load_cert_chain(CERTFILE)
443 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 727 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
444 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 728 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
445 with self.assertRaises(IOError) as cm: 729 with self.assertRaises(OSError) as cm:
446 ctx.load_cert_chain(WRONGCERT) 730 ctx.load_cert_chain(WRONGCERT)
447 self.assertEqual(cm.exception.errno, errno.ENOENT) 731 self.assertEqual(cm.exception.errno, errno.ENOENT)
448 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): 732 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
449 ctx.load_cert_chain(BADCERT) 733 ctx.load_cert_chain(BADCERT)
450 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): 734 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
451 ctx.load_cert_chain(EMPTYCERT) 735 ctx.load_cert_chain(EMPTYCERT)
452 # Separate key and cert 736 # Separate key and cert
453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 737 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
454 ctx.load_cert_chain(ONLYCERT, ONLYKEY) 738 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
455 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) 739 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
519 # Make sure the password function isn't called if it isn't needed 803 # Make sure the password function isn't called if it isn't needed
520 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 804 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
521 805
522 def test_load_verify_locations(self): 806 def test_load_verify_locations(self):
523 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 807 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
524 ctx.load_verify_locations(CERTFILE) 808 ctx.load_verify_locations(CERTFILE)
525 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 809 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
526 ctx.load_verify_locations(BYTES_CERTFILE) 810 ctx.load_verify_locations(BYTES_CERTFILE)
527 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 811 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
528 self.assertRaises(TypeError, ctx.load_verify_locations) 812 self.assertRaises(TypeError, ctx.load_verify_locations)
529 self.assertRaises(TypeError, ctx.load_verify_locations, None, None) 813 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None )
530 with self.assertRaises(IOError) as cm: 814 with self.assertRaises(OSError) as cm:
531 ctx.load_verify_locations(WRONGCERT) 815 ctx.load_verify_locations(WRONGCERT)
532 self.assertEqual(cm.exception.errno, errno.ENOENT) 816 self.assertEqual(cm.exception.errno, errno.ENOENT)
533 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): 817 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
534 ctx.load_verify_locations(BADCERT) 818 ctx.load_verify_locations(BADCERT)
535 ctx.load_verify_locations(CERTFILE, CAPATH) 819 ctx.load_verify_locations(CERTFILE, CAPATH)
536 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 820 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
537 821
538 # Issue #10989: crash if the second argument type is invalid 822 # Issue #10989: crash if the second argument type is invalid
539 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 823 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
824
825 def test_load_verify_cadata(self):
826 # test cadata
827 with open(CAFILE_CACERT) as f:
828 cacert_pem = f.read()
829 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
830 with open(CAFILE_NEURONIO) as f:
831 neuronio_pem = f.read()
832 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
833
834 # test PEM
835 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
836 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
837 ctx.load_verify_locations(cadata=cacert_pem)
838 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
839 ctx.load_verify_locations(cadata=neuronio_pem)
840 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
841 # cert already in hash table
842 ctx.load_verify_locations(cadata=neuronio_pem)
843 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
844
845 # combined
846 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
847 combined = "\n".join((cacert_pem, neuronio_pem))
848 ctx.load_verify_locations(cadata=combined)
849 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
850
851 # with junk around the certs
852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
853 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
854 neuronio_pem, "tail"]
855 ctx.load_verify_locations(cadata="\n".join(combined))
856 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
857
858 # test DER
859 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
860 ctx.load_verify_locations(cadata=cacert_der)
861 ctx.load_verify_locations(cadata=neuronio_der)
862 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
863 # cert already in hash table
864 ctx.load_verify_locations(cadata=cacert_der)
865 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
866
867 # combined
868 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
869 combined = b"".join((cacert_der, neuronio_der))
870 ctx.load_verify_locations(cadata=combined)
871 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
872
873 # error cases
874 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
875 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
876
877 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
878 ctx.load_verify_locations(cadata="broken")
879 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
880 ctx.load_verify_locations(cadata=b"broken")
881
540 882
541 def test_load_dh_params(self): 883 def test_load_dh_params(self):
542 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
543 ctx.load_dh_params(DHFILE) 885 ctx.load_dh_params(DHFILE)
544 if os.name != 'nt': 886 if os.name != 'nt':
545 ctx.load_dh_params(BYTES_DHFILE) 887 ctx.load_dh_params(BYTES_DHFILE)
546 self.assertRaises(TypeError, ctx.load_dh_params) 888 self.assertRaises(TypeError, ctx.load_dh_params)
547 self.assertRaises(TypeError, ctx.load_dh_params, None) 889 self.assertRaises(TypeError, ctx.load_dh_params, None)
548 with self.assertRaises(FileNotFoundError) as cm: 890 with self.assertRaises(FileNotFoundError) as cm:
549 ctx.load_dh_params(WRONGCERT) 891 ctx.load_dh_params(WRONGCERT)
(...skipping 28 matching lines...) Expand all
578 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build") 920 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
579 def test_set_ecdh_curve(self): 921 def test_set_ecdh_curve(self):
580 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
581 ctx.set_ecdh_curve("prime256v1") 923 ctx.set_ecdh_curve("prime256v1")
582 ctx.set_ecdh_curve(b"prime256v1") 924 ctx.set_ecdh_curve(b"prime256v1")
583 self.assertRaises(TypeError, ctx.set_ecdh_curve) 925 self.assertRaises(TypeError, ctx.set_ecdh_curve)
584 self.assertRaises(TypeError, ctx.set_ecdh_curve, None) 926 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
585 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") 927 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
586 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") 928 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
587 929
930 @needs_sni
931 def test_sni_callback(self):
932 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
933
934 # set_servername_callback expects a callable, or None
935 self.assertRaises(TypeError, ctx.set_servername_callback)
936 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
937 self.assertRaises(TypeError, ctx.set_servername_callback, "")
938 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
939
940 def dummycallback(sock, servername, ctx):
941 pass
942 ctx.set_servername_callback(None)
943 ctx.set_servername_callback(dummycallback)
944
945 @needs_sni
946 def test_sni_callback_refcycle(self):
947 # Reference cycles through the servername callback are detected
948 # and cleared.
949 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
950 def dummycallback(sock, servername, ctx, cycle=ctx):
951 pass
952 ctx.set_servername_callback(dummycallback)
953 wr = weakref.ref(ctx)
954 del ctx, dummycallback
955 gc.collect()
956 self.assertIs(wr(), None)
957
958 def test_cert_store_stats(self):
959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
960 self.assertEqual(ctx.cert_store_stats(),
961 {'x509_ca': 0, 'crl': 0, 'x509': 0})
962 ctx.load_cert_chain(CERTFILE)
963 self.assertEqual(ctx.cert_store_stats(),
964 {'x509_ca': 0, 'crl': 0, 'x509': 0})
965 ctx.load_verify_locations(CERTFILE)
966 self.assertEqual(ctx.cert_store_stats(),
967 {'x509_ca': 0, 'crl': 0, 'x509': 1})
968 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
969 self.assertEqual(ctx.cert_store_stats(),
970 {'x509_ca': 1, 'crl': 0, 'x509': 2})
971
972 def test_get_ca_certs(self):
973 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
974 self.assertEqual(ctx.get_ca_certs(), [])
975 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
976 ctx.load_verify_locations(CERTFILE)
977 self.assertEqual(ctx.get_ca_certs(), [])
978 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
979 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
980 self.assertEqual(ctx.get_ca_certs(),
981 [{'issuer': ((('organizationName', 'Root CA'),),
982 (('organizationalUnitName', 'http://www.cacert.org'),),
983 (('commonName', 'CA Cert Signing Authority'),),
984 (('emailAddress', 'support@cacert.org'),)),
985 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
986 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
987 'serialNumber': '00',
988 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
989 'subject': ((('organizationName', 'Root CA'),),
990 (('organizationalUnitName', 'http://www.cacert.org'),) ,
991 (('commonName', 'CA Cert Signing Authority'),),
992 (('emailAddress', 'support@cacert.org'),)),
993 'version': 3}])
994
995 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
996 pem = f.read()
997 der = ssl.PEM_cert_to_DER_cert(pem)
998 self.assertEqual(ctx.get_ca_certs(True), [der])
999
1000 def test_load_default_certs(self):
1001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1002 ctx.load_default_certs()
1003
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1005 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1006 ctx.load_default_certs()
1007
1008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1009 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1010
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 self.assertRaises(TypeError, ctx.load_default_certs, None)
1013 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1014
1015 def test_create_default_context(self):
1016 ctx = ssl.create_default_context()
1017 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1019 self.assertTrue(ctx.check_hostname)
1020 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1021
1022 with open(SIGNING_CA) as f:
1023 cadata = f.read()
1024 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1025 cadata=cadata)
1026 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1028 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1029
1030 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1031 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1033 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1034
1035 def test__create_stdlib_context(self):
1036 ctx = ssl._create_stdlib_context()
1037 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1038 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1039 self.assertFalse(ctx.check_hostname)
1040 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1041
1042 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1043 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1045 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1046
1047 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1048 cert_reqs=ssl.CERT_REQUIRED,
1049 check_hostname=True)
1050 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 self.assertTrue(ctx.check_hostname)
1053 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1054
1055 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1056 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1057 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1058 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1059
1060 def test_check_hostname(self):
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1062 self.assertFalse(ctx.check_hostname)
1063
1064 # Requires CERT_REQUIRED or CERT_OPTIONAL
1065 with self.assertRaises(ValueError):
1066 ctx.check_hostname = True
1067 ctx.verify_mode = ssl.CERT_REQUIRED
1068 self.assertFalse(ctx.check_hostname)
1069 ctx.check_hostname = True
1070 self.assertTrue(ctx.check_hostname)
1071
1072 ctx.verify_mode = ssl.CERT_OPTIONAL
1073 ctx.check_hostname = True
1074 self.assertTrue(ctx.check_hostname)
1075
1076 # Cannot set CERT_NONE with check_hostname enabled
1077 with self.assertRaises(ValueError):
1078 ctx.verify_mode = ssl.CERT_NONE
1079 ctx.check_hostname = False
1080 self.assertFalse(ctx.check_hostname)
1081
588 1082
589 class SSLErrorTests(unittest.TestCase): 1083 class SSLErrorTests(unittest.TestCase):
590 1084
591 def test_str(self): 1085 def test_str(self):
592 # The str() of a SSLError doesn't include the errno 1086 # The str() of a SSLError doesn't include the errno
593 e = ssl.SSLError(1, "foo") 1087 e = ssl.SSLError(1, "foo")
594 self.assertEqual(str(e), "foo") 1088 self.assertEqual(str(e), "foo")
595 self.assertEqual(e.errno, 1) 1089 self.assertEqual(e.errno, 1)
596 # Same for a subclass 1090 # Same for a subclass
597 e = ssl.SSLZeroReturnError(1, "foo") 1091 e = ssl.SSLZeroReturnError(1, "foo")
(...skipping 10 matching lines...) Expand all
608 s = str(cm.exception) 1102 s = str(cm.exception)
609 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s) 1103 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
610 1104
611 def test_subclass(self): 1105 def test_subclass(self):
612 # Check that the appropriate SSLError subclass is raised 1106 # Check that the appropriate SSLError subclass is raised
613 # (this only tests one of them) 1107 # (this only tests one of them)
614 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1108 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
615 with socket.socket() as s: 1109 with socket.socket() as s:
616 s.bind(("127.0.0.1", 0)) 1110 s.bind(("127.0.0.1", 0))
617 s.listen(5) 1111 s.listen(5)
618 with socket.socket() as c: 1112 c = socket.socket()
619 c.connect(s.getsockname()) 1113 c.connect(s.getsockname())
620 c.setblocking(False) 1114 c.setblocking(False)
621 c = ctx.wrap_socket(c, False, do_handshake_on_connect=False) 1115 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
622 with self.assertRaises(ssl.SSLWantReadError) as cm: 1116 with self.assertRaises(ssl.SSLWantReadError) as cm:
623 c.do_handshake() 1117 c.do_handshake()
624 s = str(cm.exception) 1118 s = str(cm.exception)
625 self.assertTrue(s.startswith("The operation did not complete (re ad)"), s) 1119 self.assertTrue(s.startswith("The operation did not complete (re ad)"), s)
626 # For compatibility 1120 # For compatibility
627 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ) 1121 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
628 1122
629 1123
630 class NetworkedTests(unittest.TestCase): 1124 class NetworkedTests(unittest.TestCase):
631 1125
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
704 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1198 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
705 cert_reqs=ssl.CERT_REQUIRED, 1199 cert_reqs=ssl.CERT_REQUIRED,
706 ca_certs=SVN_PYTHON_ORG_ROOT_CERT, 1200 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
707 do_handshake_on_connect=False) 1201 do_handshake_on_connect=False)
708 try: 1202 try:
709 s.settimeout(0.0000001) 1203 s.settimeout(0.0000001)
710 rc = s.connect_ex(('svn.python.org', 443)) 1204 rc = s.connect_ex(('svn.python.org', 443))
711 if rc == 0: 1205 if rc == 0:
712 self.skipTest("svn.python.org responded too quickly") 1206 self.skipTest("svn.python.org responded too quickly")
713 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1207 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1208 finally:
1209 s.close()
1210
1211 def test_connect_ex_error(self):
1212 with support.transient_internet("svn.python.org"):
1213 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1214 cert_reqs=ssl.CERT_REQUIRED,
1215 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1216 try:
1217 rc = s.connect_ex(("svn.python.org", 444))
1218 # Issue #19919: Windows machines or VMs hosted on Windows
1219 # machines sometimes return EWOULDBLOCK.
1220 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
714 finally: 1221 finally:
715 s.close() 1222 s.close()
716 1223
717 def test_connect_with_context(self): 1224 def test_connect_with_context(self):
718 with support.transient_internet("svn.python.org"): 1225 with support.transient_internet("svn.python.org"):
719 # Same as test_connect, but with a separately created context 1226 # Same as test_connect, but with a separately created context
720 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1227 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
721 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1228 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
722 s.connect(("svn.python.org", 443)) 1229 s.connect(("svn.python.org", 443))
723 try: 1230 try:
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
770 ctx.verify_mode = ssl.CERT_REQUIRED 1277 ctx.verify_mode = ssl.CERT_REQUIRED
771 ctx.load_verify_locations(capath=BYTES_CAPATH) 1278 ctx.load_verify_locations(capath=BYTES_CAPATH)
772 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1279 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
773 s.connect(("svn.python.org", 443)) 1280 s.connect(("svn.python.org", 443))
774 try: 1281 try:
775 cert = s.getpeercert() 1282 cert = s.getpeercert()
776 self.assertTrue(cert) 1283 self.assertTrue(cert)
777 finally: 1284 finally:
778 s.close() 1285 s.close()
779 1286
1287 def test_connect_cadata(self):
1288 with open(CAFILE_CACERT) as f:
1289 pem = f.read()
1290 der = ssl.PEM_cert_to_DER_cert(pem)
1291 with support.transient_internet("svn.python.org"):
1292 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1293 ctx.verify_mode = ssl.CERT_REQUIRED
1294 ctx.load_verify_locations(cadata=pem)
1295 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1296 s.connect(("svn.python.org", 443))
1297 cert = s.getpeercert()
1298 self.assertTrue(cert)
1299
1300 # same with DER
1301 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1302 ctx.verify_mode = ssl.CERT_REQUIRED
1303 ctx.load_verify_locations(cadata=der)
1304 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1305 s.connect(("svn.python.org", 443))
1306 cert = s.getpeercert()
1307 self.assertTrue(cert)
1308
780 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s") 1309 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s")
781 def test_makefile_close(self): 1310 def test_makefile_close(self):
782 # Issue #5238: creating a file-like object with makefile() shouldn't 1311 # Issue #5238: creating a file-like object with makefile() shouldn't
783 # delay closing the underlying "real socket" (here tested with its 1312 # delay closing the underlying "real socket" (here tested with its
784 # file descriptor, hence skipping the test under Windows). 1313 # file descriptor, hence skipping the test under Windows).
785 with support.transient_internet("svn.python.org"): 1314 with support.transient_internet("svn.python.org"):
786 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1315 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
787 ss.connect(("svn.python.org", 443)) 1316 ss.connect(("svn.python.org", 443))
788 fd = ss.fileno() 1317 fd = ss.fileno()
789 f = ss.makefile() 1318 f = ss.makefile()
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
841 if support.verbose: 1370 if support.verbose:
842 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) 1371 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
843 1372
844 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_ CERT) 1373 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_ CERT)
845 if support.IPV6_ENABLED: 1374 if support.IPV6_ENABLED:
846 _test_get_server_certificate('ipv6.google.com', 443) 1375 _test_get_server_certificate('ipv6.google.com', 443)
847 1376
848 def test_ciphers(self): 1377 def test_ciphers(self):
849 remote = ("svn.python.org", 443) 1378 remote = ("svn.python.org", 443)
850 with support.transient_internet(remote[0]): 1379 with support.transient_internet(remote[0]):
851 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1380 with ssl.wrap_socket(socket.socket(socket.AF_INET),
852 cert_reqs=ssl.CERT_NONE, ciphers="ALL") 1381 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
853 s.connect(remote) 1382 s.connect(remote)
854 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1383 with ssl.wrap_socket(socket.socket(socket.AF_INET),
855 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") 1384 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
856 s.connect(remote) 1385 s.connect(remote)
857 # Error checking can happen at instantiation or when connecting 1386 # Error checking can happen at instantiation or when connecting
858 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected "): 1387 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected "):
859 with socket.socket(socket.AF_INET) as sock: 1388 with socket.socket(socket.AF_INET) as sock:
860 s = ssl.wrap_socket(sock, 1389 s = ssl.wrap_socket(sock,
861 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;? *'dorothyx") 1390 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;? *'dorothyx")
862 s.connect(remote) 1391 s.connect(remote)
863 1392
864 def test_algorithms(self): 1393 def test_algorithms(self):
865 # Issue #8484: all algorithms should be available when verifying a 1394 # Issue #8484: all algorithms should be available when verifying a
866 # certificate. 1395 # certificate.
(...skipping 15 matching lines...) Expand all
882 try: 1411 try:
883 s.connect(remote) 1412 s.connect(remote)
884 if support.verbose: 1413 if support.verbose:
885 sys.stdout.write("\nCipher with %r is %r\n" % 1414 sys.stdout.write("\nCipher with %r is %r\n" %
886 (remote, s.cipher())) 1415 (remote, s.cipher()))
887 sys.stdout.write("Certificate is:\n%s\n" % 1416 sys.stdout.write("Certificate is:\n%s\n" %
888 pprint.pformat(s.getpeercert())) 1417 pprint.pformat(s.getpeercert()))
889 finally: 1418 finally:
890 s.close() 1419 s.close()
891 1420
1421 def test_get_ca_certs_capath(self):
1422 # capath certs are loaded on request
1423 with support.transient_internet("svn.python.org"):
1424 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1425 ctx.verify_mode = ssl.CERT_REQUIRED
1426 ctx.load_verify_locations(capath=CAPATH)
1427 self.assertEqual(ctx.get_ca_certs(), [])
1428 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1429 s.connect(("svn.python.org", 443))
1430 try:
1431 cert = s.getpeercert()
1432 self.assertTrue(cert)
1433 finally:
1434 s.close()
1435 self.assertEqual(len(ctx.get_ca_certs()), 1)
1436
1437 @needs_sni
1438 def test_context_setget(self):
1439 # Check that the context of a connected socket can be replaced.
1440 with support.transient_internet("svn.python.org"):
1441 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1442 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1443 s = socket.socket(socket.AF_INET)
1444 with ctx1.wrap_socket(s) as ss:
1445 ss.connect(("svn.python.org", 443))
1446 self.assertIs(ss.context, ctx1)
1447 self.assertIs(ss._sslobj.context, ctx1)
1448 ss.context = ctx2
1449 self.assertIs(ss.context, ctx2)
1450 self.assertIs(ss._sslobj.context, ctx2)
892 1451
893 try: 1452 try:
894 import threading 1453 import threading
895 except ImportError: 1454 except ImportError:
896 _have_threads = False 1455 _have_threads = False
897 else: 1456 else:
898 _have_threads = True 1457 _have_threads = True
899 1458
900 from test.ssl_servers import make_https_server 1459 from test.ssl_servers import make_https_server
901 1460
(...skipping 13 matching lines...) Expand all
915 self.sock.setblocking(1) 1474 self.sock.setblocking(1)
916 self.sslconn = None 1475 self.sslconn = None
917 threading.Thread.__init__(self) 1476 threading.Thread.__init__(self)
918 self.daemon = True 1477 self.daemon = True
919 1478
920 def wrap_conn(self): 1479 def wrap_conn(self):
921 try: 1480 try:
922 self.sslconn = self.server.context.wrap_socket( 1481 self.sslconn = self.server.context.wrap_socket(
923 self.sock, server_side=True) 1482 self.sock, server_side=True)
924 self.server.selected_protocols.append(self.sslconn.selected_ npn_protocol()) 1483 self.server.selected_protocols.append(self.sslconn.selected_ npn_protocol())
925 except ssl.SSLError as e: 1484 except (ssl.SSLError, ConnectionResetError) as e:
1485 # We treat ConnectionResetError as though it were an
1486 # SSLError - OpenSSL on Ubuntu abruptly closes the
1487 # connection when asked to use an unsupported protocol.
1488 #
926 # XXX Various errors can have happened here, for example 1489 # XXX Various errors can have happened here, for example
927 # a mismatching protocol version, an invalid certificate, 1490 # a mismatching protocol version, an invalid certificate,
928 # or a low-level bug. This should be made more discriminatin g. 1491 # or a low-level bug. This should be made more discriminatin g.
929 self.server.conn_errors.append(e) 1492 self.server.conn_errors.append(e)
930 if self.server.chatty: 1493 if self.server.chatty:
931 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") 1494 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
932 self.running = False 1495 self.running = False
933 self.server.stop() 1496 self.server.stop()
934 self.close() 1497 self.close()
935 return False 1498 return False
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
1005 sys.stdout.write(" server: read CB tls-unique fr om client, sending our CB data...\n") 1568 sys.stdout.write(" server: read CB tls-unique fr om client, sending our CB data...\n")
1006 data = self.sslconn.get_channel_binding("tls-unique" ) 1569 data = self.sslconn.get_channel_binding("tls-unique" )
1007 self.write(repr(data).encode("us-ascii") + b"\n") 1570 self.write(repr(data).encode("us-ascii") + b"\n")
1008 else: 1571 else:
1009 if (support.verbose and 1572 if (support.verbose and
1010 self.server.connectionchatty): 1573 self.server.connectionchatty):
1011 ctype = (self.sslconn and "encrypted") or "unenc rypted" 1574 ctype = (self.sslconn and "encrypted") or "unenc rypted"
1012 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" 1575 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1013 % (msg, ctype, msg.lower(), cty pe)) 1576 % (msg, ctype, msg.lower(), cty pe))
1014 self.write(msg.lower()) 1577 self.write(msg.lower())
1015 except socket.error: 1578 except OSError:
1016 if self.server.chatty: 1579 if self.server.chatty:
1017 handle_error("Test server failure:\n") 1580 handle_error("Test server failure:\n")
1018 self.close() 1581 self.close()
1019 self.running = False 1582 self.running = False
1020 # normally, we'd just stop here, but for the test 1583 # normally, we'd just stop here, but for the test
1021 # harness, we want to stop the server 1584 # harness, we want to stop the server
1022 self.server.stop() 1585 self.server.stop()
1023 1586
1024 def __init__(self, certificate=None, ssl_version=None, 1587 def __init__(self, certificate=None, ssl_version=None,
1025 certreqs=None, cacerts=None, 1588 certreqs=None, cacerts=None,
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
1115 1678
1116 def _do_ssl_handshake(self): 1679 def _do_ssl_handshake(self):
1117 try: 1680 try:
1118 self.socket.do_handshake() 1681 self.socket.do_handshake()
1119 except (ssl.SSLWantReadError, ssl.SSLWantWriteError): 1682 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1120 return 1683 return
1121 except ssl.SSLEOFError: 1684 except ssl.SSLEOFError:
1122 return self.handle_close() 1685 return self.handle_close()
1123 except ssl.SSLError: 1686 except ssl.SSLError:
1124 raise 1687 raise
1125 except socket.error as err: 1688 except OSError as err:
1126 if err.args[0] == errno.ECONNABORTED: 1689 if err.args[0] == errno.ECONNABORTED:
1127 return self.handle_close() 1690 return self.handle_close()
1128 else: 1691 else:
1129 self._ssl_accepting = False 1692 self._ssl_accepting = False
1130 1693
1131 def handle_read(self): 1694 def handle_read(self):
1132 if self._ssl_accepting: 1695 if self._ssl_accepting:
1133 self._do_ssl_handshake() 1696 self._do_ssl_handshake()
1134 else: 1697 else:
1135 data = self.recv(1024) 1698 data = self.recv(1024)
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1219 with server: 1782 with server:
1220 try: 1783 try:
1221 with socket.socket() as sock: 1784 with socket.socket() as sock:
1222 s = ssl.wrap_socket(sock, 1785 s = ssl.wrap_socket(sock,
1223 certfile=certfile, 1786 certfile=certfile,
1224 ssl_version=ssl.PROTOCOL_TLSv1) 1787 ssl_version=ssl.PROTOCOL_TLSv1)
1225 s.connect((HOST, server.port)) 1788 s.connect((HOST, server.port))
1226 except ssl.SSLError as x: 1789 except ssl.SSLError as x:
1227 if support.verbose: 1790 if support.verbose:
1228 sys.stdout.write("\nSSLError is %s\n" % x.args[1]) 1791 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1229 except socket.error as x: 1792 except OSError as x:
1230 if support.verbose: 1793 if support.verbose:
1231 sys.stdout.write("\nsocket.error is %s\n" % x.args[1]) 1794 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1232 except IOError as x: 1795 except OSError as x:
1233 if x.errno != errno.ENOENT: 1796 if x.errno != errno.ENOENT:
1234 raise 1797 raise
1235 if support.verbose: 1798 if support.verbose:
1236 sys.stdout.write("\IOError is %s\n" % str(x)) 1799 sys.stdout.write("\OSError is %s\n" % str(x))
1237 else: 1800 else:
1238 raise AssertionError("Use of invalid cert should have failed!") 1801 raise AssertionError("Use of invalid cert should have failed!")
1239 1802
1240 def server_params_test(client_context, server_context, indata=b"FOO\n", 1803 def server_params_test(client_context, server_context, indata=b"FOO\n",
1241 chatty=True, connectionchatty=False): 1804 chatty=True, connectionchatty=False, sni_name=None):
1242 """ 1805 """
1243 Launch a server, connect a client to it and try various reads 1806 Launch a server, connect a client to it and try various reads
1244 and writes. 1807 and writes.
1245 """ 1808 """
1246 stats = {} 1809 stats = {}
1247 server = ThreadedEchoServer(context=server_context, 1810 server = ThreadedEchoServer(context=server_context,
1248 chatty=chatty, 1811 chatty=chatty,
1249 connectionchatty=False) 1812 connectionchatty=False)
1250 with server: 1813 with server:
1251 with client_context.wrap_socket(socket.socket()) as s: 1814 with client_context.wrap_socket(socket.socket(),
1815 server_hostname=sni_name) as s:
1252 s.connect((HOST, server.port)) 1816 s.connect((HOST, server.port))
1253 for arg in [indata, bytearray(indata), memoryview(indata)]: 1817 for arg in [indata, bytearray(indata), memoryview(indata)]:
1254 if connectionchatty: 1818 if connectionchatty:
1255 if support.verbose: 1819 if support.verbose:
1256 sys.stdout.write( 1820 sys.stdout.write(
1257 " client: sending %r...\n" % indata) 1821 " client: sending %r...\n" % indata)
1258 s.write(arg) 1822 s.write(arg)
1259 outdata = s.read() 1823 outdata = s.read()
1260 if connectionchatty: 1824 if connectionchatty:
1261 if support.verbose: 1825 if support.verbose:
1262 sys.stdout.write(" client: read %r\n" % outdata) 1826 sys.stdout.write(" client: read %r\n" % outdata)
1263 if outdata != indata.lower(): 1827 if outdata != indata.lower():
1264 raise AssertionError( 1828 raise AssertionError(
1265 "bad data <<%r>> (%d) received; expected <<%r>> (%d) \n" 1829 "bad data <<%r>> (%d) received; expected <<%r>> (%d) \n"
1266 % (outdata[:20], len(outdata), 1830 % (outdata[:20], len(outdata),
1267 indata[:20].lower(), len(indata))) 1831 indata[:20].lower(), len(indata)))
1268 s.write(b"over\n") 1832 s.write(b"over\n")
1269 if connectionchatty: 1833 if connectionchatty:
1270 if support.verbose: 1834 if support.verbose:
1271 sys.stdout.write(" client: closing connection.\n") 1835 sys.stdout.write(" client: closing connection.\n")
1272 stats.update({ 1836 stats.update({
1273 'compression': s.compression(), 1837 'compression': s.compression(),
1274 'cipher': s.cipher(), 1838 'cipher': s.cipher(),
1839 'peercert': s.getpeercert(),
1275 'client_npn_protocol': s.selected_npn_protocol() 1840 'client_npn_protocol': s.selected_npn_protocol()
1276 }) 1841 })
1277 s.close() 1842 s.close()
1278 stats['server_npn_protocols'] = server.selected_protocols 1843 stats['server_npn_protocols'] = server.selected_protocols
1279 return stats 1844 return stats
1280 1845
1281 def try_protocol_combo(server_protocol, client_protocol, expect_success, 1846 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1282 certsreqs=None, server_options=0, client_options=0): 1847 certsreqs=None, server_options=0, client_options=0):
1283 if certsreqs is None: 1848 if certsreqs is None:
1284 certsreqs = ssl.CERT_NONE 1849 certsreqs = ssl.CERT_NONE
1285 certtype = { 1850 certtype = {
1286 ssl.CERT_NONE: "CERT_NONE", 1851 ssl.CERT_NONE: "CERT_NONE",
1287 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 1852 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1288 ssl.CERT_REQUIRED: "CERT_REQUIRED", 1853 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1289 }[certsreqs] 1854 }[certsreqs]
1290 if support.verbose: 1855 if support.verbose:
1291 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 1856 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
1292 sys.stdout.write(formatstr % 1857 sys.stdout.write(formatstr %
1293 (ssl.get_protocol_name(client_protocol), 1858 (ssl.get_protocol_name(client_protocol),
1294 ssl.get_protocol_name(server_protocol), 1859 ssl.get_protocol_name(server_protocol),
1295 certtype)) 1860 certtype))
1296 client_context = ssl.SSLContext(client_protocol) 1861 client_context = ssl.SSLContext(client_protocol)
1297 client_context.options = ssl.OP_ALL | client_options 1862 client_context.options |= client_options
1298 server_context = ssl.SSLContext(server_protocol) 1863 server_context = ssl.SSLContext(server_protocol)
1299 server_context.options = ssl.OP_ALL | server_options 1864 server_context.options |= server_options
1865
1866 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1867 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1868 # starting from OpenSSL 1.0.0 (see issue #8322).
1869 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1870 client_context.set_ciphers("ALL")
1871
1300 for ctx in (client_context, server_context): 1872 for ctx in (client_context, server_context):
1301 ctx.verify_mode = certsreqs 1873 ctx.verify_mode = certsreqs
1302 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
1303 # will send an SSLv3 hello (rather than SSLv2) starting from
1304 # OpenSSL 1.0.0 (see issue #8322).
1305 ctx.set_ciphers("ALL")
1306 ctx.load_cert_chain(CERTFILE) 1874 ctx.load_cert_chain(CERTFILE)
1307 ctx.load_verify_locations(CERTFILE) 1875 ctx.load_verify_locations(CERTFILE)
1308 try: 1876 try:
1309 server_params_test(client_context, server_context, 1877 server_params_test(client_context, server_context,
1310 chatty=False, connectionchatty=False) 1878 chatty=False, connectionchatty=False)
1311 # Protocol mismatch can result in either an SSLError, or a 1879 # Protocol mismatch can result in either an SSLError, or a
1312 # "Connection reset by peer" error. 1880 # "Connection reset by peer" error.
1313 except ssl.SSLError: 1881 except ssl.SSLError:
1314 if expect_success: 1882 if expect_success:
1315 raise 1883 raise
1316 except socket.error as e: 1884 except OSError as e:
1317 if expect_success or e.errno != errno.ECONNRESET: 1885 if expect_success or e.errno != errno.ECONNRESET:
1318 raise 1886 raise
1319 else: 1887 else:
1320 if not expect_success: 1888 if not expect_success:
1321 raise AssertionError( 1889 raise AssertionError(
1322 "Client protocol %s succeeded with server protocol %s!" 1890 "Client protocol %s succeeded with server protocol %s!"
1323 % (ssl.get_protocol_name(client_protocol), 1891 % (ssl.get_protocol_name(client_protocol),
1324 ssl.get_protocol_name(server_protocol))) 1892 ssl.get_protocol_name(server_protocol)))
1325 1893
1326 1894
1327 class ThreadedTests(unittest.TestCase): 1895 class ThreadedTests(unittest.TestCase):
1328 1896
1329 @skip_if_broken_ubuntu_ssl 1897 @skip_if_broken_ubuntu_ssl
1330 def test_echo(self): 1898 def test_echo(self):
1331 """Basic test of an SSL client connecting to a server""" 1899 """Basic test of an SSL client connecting to a server"""
1332 if support.verbose: 1900 if support.verbose:
1333 sys.stdout.write("\n") 1901 sys.stdout.write("\n")
1334 for protocol in PROTOCOLS: 1902 for protocol in PROTOCOLS:
1335 context = ssl.SSLContext(protocol) 1903 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
1336 context.load_cert_chain(CERTFILE) 1904 context = ssl.SSLContext(protocol)
1337 server_params_test(context, context, 1905 context.load_cert_chain(CERTFILE)
1338 chatty=True, connectionchatty=True) 1906 server_params_test(context, context,
1907 chatty=True, connectionchatty=True)
1339 1908
1340 def test_getpeercert(self): 1909 def test_getpeercert(self):
1341 if support.verbose: 1910 if support.verbose:
1342 sys.stdout.write("\n") 1911 sys.stdout.write("\n")
1343 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1912 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1344 context.verify_mode = ssl.CERT_REQUIRED 1913 context.verify_mode = ssl.CERT_REQUIRED
1345 context.load_verify_locations(CERTFILE) 1914 context.load_verify_locations(CERTFILE)
1346 context.load_cert_chain(CERTFILE) 1915 context.load_cert_chain(CERTFILE)
1347 server = ThreadedEchoServer(context=context, chatty=False) 1916 server = ThreadedEchoServer(context=context, chatty=False)
1348 with server: 1917 with server:
1349 s = context.wrap_socket(socket.socket()) 1918 s = context.wrap_socket(socket.socket(),
1919 do_handshake_on_connect=False)
1350 s.connect((HOST, server.port)) 1920 s.connect((HOST, server.port))
1921 # getpeercert() raise ValueError while the handshake isn't
1922 # done.
1923 with self.assertRaises(ValueError):
1924 s.getpeercert()
1925 s.do_handshake()
1351 cert = s.getpeercert() 1926 cert = s.getpeercert()
1352 self.assertTrue(cert, "Can't get peer certificate.") 1927 self.assertTrue(cert, "Can't get peer certificate.")
1353 cipher = s.cipher() 1928 cipher = s.cipher()
1354 if support.verbose: 1929 if support.verbose:
1355 sys.stdout.write(pprint.pformat(cert) + '\n') 1930 sys.stdout.write(pprint.pformat(cert) + '\n')
1356 sys.stdout.write("Connection cipher is " + str(cipher) + '.\ n') 1931 sys.stdout.write("Connection cipher is " + str(cipher) + '.\ n')
1357 if 'subject' not in cert: 1932 if 'subject' not in cert:
1358 self.fail("No subject field in certificate: %s." % 1933 self.fail("No subject field in certificate: %s." %
1359 pprint.pformat(cert)) 1934 pprint.pformat(cert))
1360 if ((('organizationName', 'Python Software Foundation'),) 1935 if ((('organizationName', 'Python Software Foundation'),)
1361 not in cert['subject']): 1936 not in cert['subject']):
1362 self.fail( 1937 self.fail(
1363 "Missing or invalid 'organizationName' field in certific ate subject; " 1938 "Missing or invalid 'organizationName' field in certific ate subject; "
1364 "should be 'Python Software Foundation'.") 1939 "should be 'Python Software Foundation'.")
1365 self.assertIn('notBefore', cert) 1940 self.assertIn('notBefore', cert)
1366 self.assertIn('notAfter', cert) 1941 self.assertIn('notAfter', cert)
1367 before = ssl.cert_time_to_seconds(cert['notBefore']) 1942 before = ssl.cert_time_to_seconds(cert['notBefore'])
1368 after = ssl.cert_time_to_seconds(cert['notAfter']) 1943 after = ssl.cert_time_to_seconds(cert['notAfter'])
1369 self.assertLess(before, after) 1944 self.assertLess(before, after)
1370 s.close() 1945 s.close()
1371 1946
1947 @unittest.skipUnless(have_verify_flags(),
1948 "verify_flags need OpenSSL > 0.9.8")
1949 def test_crl_check(self):
1950 if support.verbose:
1951 sys.stdout.write("\n")
1952
1953 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1954 server_context.load_cert_chain(SIGNED_CERTFILE)
1955
1956 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1957 context.verify_mode = ssl.CERT_REQUIRED
1958 context.load_verify_locations(SIGNING_CA)
1959 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
1960
1961 # VERIFY_DEFAULT should pass
1962 server = ThreadedEchoServer(context=server_context, chatty=True)
1963 with server:
1964 with context.wrap_socket(socket.socket()) as s:
1965 s.connect((HOST, server.port))
1966 cert = s.getpeercert()
1967 self.assertTrue(cert, "Can't get peer certificate.")
1968
1969 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
1970 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
1971
1972 server = ThreadedEchoServer(context=server_context, chatty=True)
1973 with server:
1974 with context.wrap_socket(socket.socket()) as s:
1975 with self.assertRaisesRegex(ssl.SSLError,
1976 "certificate verify failed"):
1977 s.connect((HOST, server.port))
1978
1979 # now load a CRL file. The CRL file is signed by the CA.
1980 context.load_verify_locations(CRLFILE)
1981
1982 server = ThreadedEchoServer(context=server_context, chatty=True)
1983 with server:
1984 with context.wrap_socket(socket.socket()) as s:
1985 s.connect((HOST, server.port))
1986 cert = s.getpeercert()
1987 self.assertTrue(cert, "Can't get peer certificate.")
1988
1989 @needs_sni
1990 def test_check_hostname(self):
1991 if support.verbose:
1992 sys.stdout.write("\n")
1993
1994 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1995 server_context.load_cert_chain(SIGNED_CERTFILE)
1996
1997 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1998 context.verify_mode = ssl.CERT_REQUIRED
1999 context.check_hostname = True
2000 context.load_verify_locations(SIGNING_CA)
2001
2002 # correct hostname should verify
2003 server = ThreadedEchoServer(context=server_context, chatty=True)
2004 with server:
2005 with context.wrap_socket(socket.socket(),
2006 server_hostname="localhost") as s:
2007 s.connect((HOST, server.port))
2008 cert = s.getpeercert()
2009 self.assertTrue(cert, "Can't get peer certificate.")
2010
2011 # incorrect hostname should raise an exception
2012 server = ThreadedEchoServer(context=server_context, chatty=True)
2013 with server:
2014 with context.wrap_socket(socket.socket(),
2015 server_hostname="invalid") as s:
2016 with self.assertRaisesRegex(ssl.CertificateError,
2017 "hostname 'invalid' doesn't matc h 'localhost'"):
2018 s.connect((HOST, server.port))
2019
2020 # missing server_hostname arg should cause an exception, too
2021 server = ThreadedEchoServer(context=server_context, chatty=True)
2022 with server:
2023 with socket.socket() as s:
2024 with self.assertRaisesRegex(ValueError,
2025 "check_hostname requires server_ hostname"):
2026 context.wrap_socket(s)
2027
1372 def test_empty_cert(self): 2028 def test_empty_cert(self):
1373 """Connecting with an empty cert file""" 2029 """Connecting with an empty cert file"""
1374 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2030 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1375 "nullcert.pem")) 2031 "nullcert.pem"))
1376 def test_malformed_cert(self): 2032 def test_malformed_cert(self):
1377 """Connecting with a badly formatted certificate (syntax error)""" 2033 """Connecting with a badly formatted certificate (syntax error)"""
1378 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2034 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1379 "badcert.pem")) 2035 "badcert.pem"))
1380 def test_nonexisting_cert(self): 2036 def test_nonexisting_cert(self):
1381 """Connecting with a non-existing cert file""" 2037 """Connecting with a non-existing cert file"""
1382 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2038 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1383 "wrongcert.pem")) 2039 "wrongcert.pem"))
1384 def test_malformed_key(self): 2040 def test_malformed_key(self):
1385 """Connecting with a badly formatted key (syntax error)""" 2041 """Connecting with a badly formatted key (syntax error)"""
1386 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2042 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1387 "badkey.pem")) 2043 "badkey.pem"))
1388 2044
1389 def test_rude_shutdown(self): 2045 def test_rude_shutdown(self):
1390 """A brutal shutdown of an SSL server should raise an IOError 2046 """A brutal shutdown of an SSL server should raise an OSError
1391 in the client when attempting handshake. 2047 in the client when attempting handshake.
1392 """ 2048 """
1393 listener_ready = threading.Event() 2049 listener_ready = threading.Event()
1394 listener_gone = threading.Event() 2050 listener_gone = threading.Event()
1395 2051
1396 s = socket.socket() 2052 s = socket.socket()
1397 port = support.bind_port(s, HOST) 2053 port = support.bind_port(s, HOST)
1398 2054
1399 # `listener` runs in a thread. It sits in an accept() until 2055 # `listener` runs in a thread. It sits in an accept() until
1400 # the main thread connects. Then it rudely closes the socket, 2056 # the main thread connects. Then it rudely closes the socket,
1401 # and sets Event `listener_gone` to let the main thread know 2057 # and sets Event `listener_gone` to let the main thread know
1402 # the socket is gone. 2058 # the socket is gone.
1403 def listener(): 2059 def listener():
1404 s.listen(5) 2060 s.listen(5)
1405 listener_ready.set() 2061 listener_ready.set()
1406 newsock, addr = s.accept() 2062 newsock, addr = s.accept()
1407 newsock.close() 2063 newsock.close()
1408 s.close() 2064 s.close()
1409 listener_gone.set() 2065 listener_gone.set()
1410 2066
1411 def connector(): 2067 def connector():
1412 listener_ready.wait() 2068 listener_ready.wait()
1413 with socket.socket() as c: 2069 with socket.socket() as c:
1414 c.connect((HOST, port)) 2070 c.connect((HOST, port))
1415 listener_gone.wait() 2071 listener_gone.wait()
1416 try: 2072 try:
1417 ssl_sock = ssl.wrap_socket(c) 2073 ssl_sock = ssl.wrap_socket(c)
1418 except IOError: 2074 except OSError:
1419 pass 2075 pass
1420 else: 2076 else:
1421 self.fail('connecting to closed SSL socket should have f ailed') 2077 self.fail('connecting to closed SSL socket should have f ailed')
1422 2078
1423 t = threading.Thread(target=listener) 2079 t = threading.Thread(target=listener)
1424 t.start() 2080 t.start()
1425 try: 2081 try:
1426 connector() 2082 connector()
1427 finally: 2083 finally:
1428 t.join() 2084 t.join()
1429 2085
1430 @skip_if_broken_ubuntu_ssl 2086 @skip_if_broken_ubuntu_ssl
1431 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2087 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
1432 "OpenSSL is compiled without SSLv2 support") 2088 "OpenSSL is compiled without SSLv2 support")
1433 def test_protocol_sslv2(self): 2089 def test_protocol_sslv2(self):
1434 """Connecting to an SSLv2 server with various client options""" 2090 """Connecting to an SSLv2 server with various client options"""
1435 if support.verbose: 2091 if support.verbose:
1436 sys.stdout.write("\n") 2092 sys.stdout.write("\n")
1437 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2093 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1438 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL) 2094 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL)
1439 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED) 2095 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED)
1440 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 2096 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
1441 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2097 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
1442 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2098 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
1443 # SSLv23 client with specific SSL options 2099 # SSLv23 client with specific SSL options
1444 if no_sslv2_implies_sslv3_hello(): 2100 if no_sslv2_implies_sslv3_hello():
1445 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2101 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
1446 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e, 2102 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e,
1447 client_options=ssl.OP_NO_SSLv2) 2103 client_options=ssl.OP_NO_SSLv2)
1448 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2104 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1449 client_options=ssl.OP_NO_SSLv3) 2105 client_options=ssl.OP_NO_SSLv3)
1450 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2106 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1451 client_options=ssl.OP_NO_TLSv1) 2107 client_options=ssl.OP_NO_TLSv1)
1452 2108
1453 @skip_if_broken_ubuntu_ssl 2109 @skip_if_broken_ubuntu_ssl
1454 def test_protocol_sslv23(self): 2110 def test_protocol_sslv23(self):
1455 """Connecting to an SSLv23 server with various client options""" 2111 """Connecting to an SSLv23 server with various client options"""
1456 if support.verbose: 2112 if support.verbose:
1457 sys.stdout.write("\n") 2113 sys.stdout.write("\n")
1458 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2114 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1459 try: 2115 try:
1460 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2116 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
1461 except (ssl.SSLError, socket.error) as x: 2117 except OSError as x:
1462 # this fails on some older versions of OpenSSL (0.9.7l, for instance) 2118 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
1463 if support.verbose: 2119 if support.verbose:
1464 sys.stdout.write( 2120 sys.stdout.write(
1465 " SSL2 client to SSL23 server test unexpectedly fail ed:\n %s\n" 2121 " SSL2 client to SSL23 server test unexpectedly fail ed:\n %s\n"
1466 % str(x)) 2122 % str(x))
1467 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) 2123 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
1468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 2124 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
1469 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) 2125 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
1470 2126
1471 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ss l.CERT_OPTIONAL) 2127 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ss l.CERT_OPTIONAL)
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
1510 if support.verbose: 2166 if support.verbose:
1511 sys.stdout.write("\n") 2167 sys.stdout.write("\n")
1512 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) 2168 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1513 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_OPTIONAL) 2169 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_OPTIONAL)
1514 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_REQUIRED) 2170 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_REQUIRED)
1515 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2171 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1516 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False ) 2172 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False )
1517 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 2173 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1518 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, 2174 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
1519 client_options=ssl.OP_NO_TLSv1) 2175 client_options=ssl.OP_NO_TLSv1)
2176
2177 @skip_if_broken_ubuntu_ssl
2178 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2179 "TLS version 1.1 not supported.")
2180 def test_protocol_tlsv1_1(self):
2181 """Connecting to a TLSv1.1 server with various client options.
2182 Testing against older TLS versions."""
2183 if support.verbose:
2184 sys.stdout.write("\n")
2185 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True)
2186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2187 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, Fal se)
2188 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2189 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2190 client_options=ssl.OP_NO_TLSv1_1)
2191
2192 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True)
2193 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2194 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2195
2196
2197 @skip_if_broken_ubuntu_ssl
2198 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2199 "TLS version 1.2 not supported.")
2200 def test_protocol_tlsv1_2(self):
2201 """Connecting to a TLSv1.2 server with various client options.
2202 Testing against older TLS versions."""
2203 if support.verbose:
2204 sys.stdout.write("\n")
2205 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True,
2206 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2207 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2208 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2209 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, Fal se)
2210 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2211 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2212 client_options=ssl.OP_NO_TLSv1_2)
2213
2214 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True)
2215 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2216 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2217 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False )
2218 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False )
1520 2219
1521 def test_starttls(self): 2220 def test_starttls(self):
1522 """Switching from clear text to encrypted and back again.""" 2221 """Switching from clear text to encrypted and back again."""
1523 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTL S", b"msg 5", b"msg 6") 2222 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTL S", b"msg 5", b"msg 6")
1524 2223
1525 server = ThreadedEchoServer(CERTFILE, 2224 server = ThreadedEchoServer(CERTFILE,
1526 ssl_version=ssl.PROTOCOL_TLSv1, 2225 ssl_version=ssl.PROTOCOL_TLSv1,
1527 starttls_server=True, 2226 starttls_server=True,
1528 chatty=True, 2227 chatty=True,
1529 connectionchatty=True) 2228 connectionchatty=True)
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1571 conn.write(b"over\n") 2270 conn.write(b"over\n")
1572 else: 2271 else:
1573 s.send(b"over\n") 2272 s.send(b"over\n")
1574 if wrapped: 2273 if wrapped:
1575 conn.close() 2274 conn.close()
1576 else: 2275 else:
1577 s.close() 2276 s.close()
1578 2277
1579 def test_socketserver(self): 2278 def test_socketserver(self):
1580 """Using a SocketServer to create and manage SSL connections.""" 2279 """Using a SocketServer to create and manage SSL connections."""
1581 server = make_https_server(self, CERTFILE) 2280 server = make_https_server(self, certfile=CERTFILE)
1582 # try to connect 2281 # try to connect
1583 if support.verbose: 2282 if support.verbose:
1584 sys.stdout.write('\n') 2283 sys.stdout.write('\n')
1585 with open(CERTFILE, 'rb') as f: 2284 with open(CERTFILE, 'rb') as f:
1586 d1 = f.read() 2285 d1 = f.read()
1587 d2 = '' 2286 d2 = ''
1588 # now fetch the same data from the HTTPS server 2287 # now fetch the same data from the HTTPS server
1589 url = 'https://%s:%d/%s' % ( 2288 url = 'https://%s:%d/%s' % (
1590 HOST, server.port, os.path.split(CERTFILE)[1]) 2289 HOST, server.port, os.path.split(CERTFILE)[1])
1591 f = urllib.request.urlopen(url) 2290 f = urllib.request.urlopen(url)
(...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after
1821 2520
1822 t = threading.Thread(target=serve) 2521 t = threading.Thread(target=serve)
1823 t.start() 2522 t.start()
1824 # Client wait until server setup and perform a connect. 2523 # Client wait until server setup and perform a connect.
1825 evt.wait() 2524 evt.wait()
1826 client = context.wrap_socket(socket.socket()) 2525 client = context.wrap_socket(socket.socket())
1827 client.connect((host, port)) 2526 client.connect((host, port))
1828 client_addr = client.getsockname() 2527 client_addr = client.getsockname()
1829 client.close() 2528 client.close()
1830 t.join() 2529 t.join()
2530 remote.close()
2531 server.close()
1831 # Sanity checks. 2532 # Sanity checks.
1832 self.assertIsInstance(remote, ssl.SSLSocket) 2533 self.assertIsInstance(remote, ssl.SSLSocket)
1833 self.assertEqual(peer, client_addr) 2534 self.assertEqual(peer, client_addr)
2535
2536 def test_getpeercert_enotconn(self):
2537 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2538 with context.wrap_socket(socket.socket()) as sock:
2539 with self.assertRaises(OSError) as cm:
2540 sock.getpeercert()
2541 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2542
2543 def test_do_handshake_enotconn(self):
2544 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2545 with context.wrap_socket(socket.socket()) as sock:
2546 with self.assertRaises(OSError) as cm:
2547 sock.do_handshake()
2548 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
1834 2549
1835 def test_default_ciphers(self): 2550 def test_default_ciphers(self):
1836 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2551 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1837 try: 2552 try:
1838 # Force a set of weak ciphers on our client context 2553 # Force a set of weak ciphers on our client context
1839 context.set_ciphers("DES") 2554 context.set_ciphers("DES")
1840 except ssl.SSLError: 2555 except ssl.SSLError:
1841 self.skipTest("no DES cipher available") 2556 self.skipTest("no DES cipher available")
1842 with ThreadedEchoServer(CERTFILE, 2557 with ThreadedEchoServer(CERTFILE,
1843 ssl_version=ssl.PROTOCOL_SSLv23, 2558 ssl_version=ssl.PROTOCOL_SSLv23,
1844 chatty=False) as server: 2559 chatty=False) as server:
1845 with socket.socket() as sock: 2560 with context.wrap_socket(socket.socket()) as s:
1846 s = context.wrap_socket(sock) 2561 with self.assertRaises(OSError):
1847 with self.assertRaises((OSError, ssl.SSLError)):
1848 s.connect((HOST, server.port)) 2562 s.connect((HOST, server.port))
1849 self.assertIn("no shared cipher", str(server.conn_errors[0])) 2563 self.assertIn("no shared cipher", str(server.conn_errors[0]))
1850 2564
1851 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 2565 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
1852 "'tls-unique' channel binding not available") 2566 "'tls-unique' channel binding not available")
1853 def test_tls_unique_channel_binding(self): 2567 def test_tls_unique_channel_binding(self):
1854 """Test tls-unique channel binding.""" 2568 """Test tls-unique channel binding."""
1855 if support.verbose: 2569 if support.verbose:
1856 sys.stdout.write("\n") 2570 sys.stdout.write("\n")
1857 2571
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
1970 msg = "failed trying %s (s) and %s (c).\n" \ 2684 msg = "failed trying %s (s) and %s (c).\n" \
1971 "was expecting %s, but got %%s from the %%s" \ 2685 "was expecting %s, but got %%s from the %%s" \
1972 % (str(server_protocols), str(client_protocols), 2686 % (str(server_protocols), str(client_protocols),
1973 str(expected)) 2687 str(expected))
1974 client_result = stats['client_npn_protocol'] 2688 client_result = stats['client_npn_protocol']
1975 self.assertEqual(client_result, expected, msg % (client_result, "client")) 2689 self.assertEqual(client_result, expected, msg % (client_result, "client"))
1976 server_result = stats['server_npn_protocols'][-1] \ 2690 server_result = stats['server_npn_protocols'][-1] \
1977 if len(stats['server_npn_protocols']) else 'nothing' 2691 if len(stats['server_npn_protocols']) else 'nothing'
1978 self.assertEqual(server_result, expected, msg % (server_result, "server")) 2692 self.assertEqual(server_result, expected, msg % (server_result, "server"))
1979 2693
2694 def sni_contexts(self):
2695 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2696 server_context.load_cert_chain(SIGNED_CERTFILE)
2697 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2698 other_context.load_cert_chain(SIGNED_CERTFILE2)
2699 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2700 client_context.verify_mode = ssl.CERT_REQUIRED
2701 client_context.load_verify_locations(SIGNING_CA)
2702 return server_context, other_context, client_context
2703
2704 def check_common_name(self, stats, name):
2705 cert = stats['peercert']
2706 self.assertIn((('commonName', name),), cert['subject'])
2707
2708 @needs_sni
2709 def test_sni_callback(self):
2710 calls = []
2711 server_context, other_context, client_context = self.sni_contexts()
2712
2713 def servername_cb(ssl_sock, server_name, initial_context):
2714 calls.append((server_name, initial_context))
2715 if server_name is not None:
2716 ssl_sock.context = other_context
2717 server_context.set_servername_callback(servername_cb)
2718
2719 stats = server_params_test(client_context, server_context,
2720 chatty=True,
2721 sni_name='supermessage')
2722 # The hostname was fetched properly, and the certificate was
2723 # changed for the connection.
2724 self.assertEqual(calls, [("supermessage", server_context)])
2725 # CERTFILE4 was selected
2726 self.check_common_name(stats, 'fakehostname')
2727
2728 calls = []
2729 # The callback is called with server_name=None
2730 stats = server_params_test(client_context, server_context,
2731 chatty=True,
2732 sni_name=None)
2733 self.assertEqual(calls, [(None, server_context)])
2734 self.check_common_name(stats, 'localhost')
2735
2736 # Check disabling the callback
2737 calls = []
2738 server_context.set_servername_callback(None)
2739
2740 stats = server_params_test(client_context, server_context,
2741 chatty=True,
2742 sni_name='notfunny')
2743 # Certificate didn't change
2744 self.check_common_name(stats, 'localhost')
2745 self.assertEqual(calls, [])
2746
2747 @needs_sni
2748 def test_sni_callback_alert(self):
2749 # Returning a TLS alert is reflected to the connecting client
2750 server_context, other_context, client_context = self.sni_contexts()
2751
2752 def cb_returning_alert(ssl_sock, server_name, initial_context):
2753 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2754 server_context.set_servername_callback(cb_returning_alert)
2755
2756 with self.assertRaises(ssl.SSLError) as cm:
2757 stats = server_params_test(client_context, server_context,
2758 chatty=False,
2759 sni_name='supermessage')
2760 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2761
2762 @needs_sni
2763 def test_sni_callback_raising(self):
2764 # Raising fails the connection with a TLS handshake failure alert.
2765 server_context, other_context, client_context = self.sni_contexts()
2766
2767 def cb_raising(ssl_sock, server_name, initial_context):
2768 1/0
2769 server_context.set_servername_callback(cb_raising)
2770
2771 with self.assertRaises(ssl.SSLError) as cm, \
2772 support.captured_stderr() as stderr:
2773 stats = server_params_test(client_context, server_context,
2774 chatty=False,
2775 sni_name='supermessage')
2776 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE ')
2777 self.assertIn("ZeroDivisionError", stderr.getvalue())
2778
2779 @needs_sni
2780 def test_sni_callback_wrong_return_type(self):
2781 # Returning the wrong return type terminates the TLS connection
2782 # with an internal error alert.
2783 server_context, other_context, client_context = self.sni_contexts()
2784
2785 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2786 return "foo"
2787 server_context.set_servername_callback(cb_wrong_return_type)
2788
2789 with self.assertRaises(ssl.SSLError) as cm, \
2790 support.captured_stderr() as stderr:
2791 stats = server_params_test(client_context, server_context,
2792 chatty=False,
2793 sni_name='supermessage')
2794 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2795 self.assertIn("TypeError", stderr.getvalue())
2796
2797 def test_read_write_after_close_raises_valuerror(self):
2798 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2799 context.verify_mode = ssl.CERT_REQUIRED
2800 context.load_verify_locations(CERTFILE)
2801 context.load_cert_chain(CERTFILE)
2802 server = ThreadedEchoServer(context=context, chatty=False)
2803
2804 with server:
2805 s = context.wrap_socket(socket.socket())
2806 s.connect((HOST, server.port))
2807 s.close()
2808
2809 self.assertRaises(ValueError, s.read, 1024)
2810 self.assertRaises(ValueError, s.write, b'hello')
2811
1980 2812
1981 def test_main(verbose=False): 2813 def test_main(verbose=False):
1982 if support.verbose: 2814 if support.verbose:
1983 plats = { 2815 plats = {
1984 'Linux': platform.linux_distribution, 2816 'Linux': platform.linux_distribution,
1985 'Mac': platform.mac_ver, 2817 'Mac': platform.mac_ver,
1986 'Windows': platform.win32_ver, 2818 'Windows': platform.win32_ver,
1987 } 2819 }
1988 for name, func in plats.items(): 2820 for name, func in plats.items():
1989 plat = func() 2821 plat = func()
1990 if plat and plat[0]: 2822 if plat and plat[0]:
1991 plat = '%s %r' % (name, plat) 2823 plat = '%s %r' % (name, plat)
1992 break 2824 break
1993 else: 2825 else:
1994 plat = repr(platform.platform()) 2826 plat = repr(platform.platform())
1995 print("test_ssl: testing with %r %r" % 2827 print("test_ssl: testing with %r %r" %
1996 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) 2828 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
1997 print(" under %s" % plat) 2829 print(" under %s" % plat)
1998 print(" HAS_SNI = %r" % ssl.HAS_SNI) 2830 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2831 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2832 try:
2833 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2834 except AttributeError:
2835 pass
1999 2836
2000 for filename in [ 2837 for filename in [
2001 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE, 2838 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2002 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, 2839 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2840 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2003 BADCERT, BADKEY, EMPTYCERT]: 2841 BADCERT, BADKEY, EMPTYCERT]:
2004 if not os.path.exists(filename): 2842 if not os.path.exists(filename):
2005 raise support.TestFailed("Can't read certificate file %r" % filename ) 2843 raise support.TestFailed("Can't read certificate file %r" % filename )
2006 2844
2007 tests = [ContextTests, BasicSocketTests, SSLErrorTests] 2845 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
2008 2846
2009 if support.is_resource_enabled('network'): 2847 if support.is_resource_enabled('network'):
2010 tests.append(NetworkedTests) 2848 tests.append(NetworkedTests)
2011 2849
2012 if _have_threads: 2850 if _have_threads:
2013 thread_info = support.threading_setup() 2851 thread_info = support.threading_setup()
2014 if thread_info and support.is_resource_enabled('network'): 2852 if thread_info:
2015 tests.append(ThreadedTests) 2853 tests.append(ThreadedTests)
2016 2854
2017 try: 2855 try:
2018 support.run_unittest(*tests) 2856 support.run_unittest(*tests)
2019 finally: 2857 finally:
2020 if _have_threads: 2858 if _have_threads:
2021 support.threading_cleanup(*thread_info) 2859 support.threading_cleanup(*thread_info)
2022 2860
2023 if __name__ == "__main__": 2861 if __name__ == "__main__":
2024 test_main() 2862 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+