Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(166789)

Delta Between Two Patch Sets: Lib/test/test_ssl.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 6 years, 3 months ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_socket.py ('k') | Lib/test/test_strptime.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # Test the support for SSL and sockets 1 # Test the support for SSL and sockets
2 2
3 import sys 3 import sys
4 import unittest 4 import unittest
5 from test import support 5 from test import support
6 import socket 6 import socket
7 import select 7 import select
8 import time 8 import time
9 import datetime
9 import gc 10 import gc
10 import os 11 import os
11 import errno 12 import errno
12 import pprint 13 import pprint
13 import tempfile 14 import tempfile
14 import urllib.request 15 import urllib.request
15 import traceback 16 import traceback
16 import asyncore 17 import asyncore
17 import weakref 18 import weakref
18 import platform 19 import platform
19 import functools 20 import functools
21 from unittest import mock
20 22
21 ssl = support.import_module("ssl") 23 ssl = support.import_module("ssl")
22 24
23 PROTOCOLS = [ 25 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
24 ssl.PROTOCOL_SSLv3,
25 ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1
26 ]
27 if hasattr(ssl, 'PROTOCOL_SSLv2'):
28 PROTOCOLS.append(ssl.PROTOCOL_SSLv2)
29
30 HOST = support.HOST 26 HOST = support.HOST
31 27
32 data_file = lambda name: os.path.join(os.path.dirname(__file__), name) 28 def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
33 30
34 # The custom key and certificate files used in test_ssl are generated 31 # The custom key and certificate files used in test_ssl are generated
35 # using Lib/test/make_ssl_certs.py. 32 # using Lib/test/make_ssl_certs.py.
36 # Other certificates are simply fetched from the Internet servers they 33 # Other certificates are simply fetched from the Internet servers they
37 # are meant to authenticate. 34 # are meant to authenticate.
38 35
39 CERTFILE = data_file("keycert.pem") 36 CERTFILE = data_file("keycert.pem")
40 BYTES_CERTFILE = os.fsencode(CERTFILE) 37 BYTES_CERTFILE = os.fsencode(CERTFILE)
41 ONLYCERT = data_file("ssl_cert.pem") 38 ONLYCERT = data_file("ssl_cert.pem")
42 ONLYKEY = data_file("ssl_key.pem") 39 ONLYKEY = data_file("ssl_key.pem")
43 BYTES_ONLYCERT = os.fsencode(ONLYCERT) 40 BYTES_ONLYCERT = os.fsencode(ONLYCERT)
44 BYTES_ONLYKEY = os.fsencode(ONLYKEY) 41 BYTES_ONLYKEY = os.fsencode(ONLYKEY)
45 CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 42 CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
46 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 43 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
47 KEY_PASSWORD = "somepass" 44 KEY_PASSWORD = "somepass"
48 CAPATH = data_file("capath") 45 CAPATH = data_file("capath")
49 BYTES_CAPATH = os.fsencode(CAPATH) 46 BYTES_CAPATH = os.fsencode(CAPATH)
47 CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48 CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
50
51 # empty CRL
52 CRLFILE = data_file("revocation.crl")
50 53
51 # Two keys and certs signed by the same CA (for SNI tests) 54 # Two keys and certs signed by the same CA (for SNI tests)
52 SIGNED_CERTFILE = data_file("keycert3.pem") 55 SIGNED_CERTFILE = data_file("keycert3.pem")
53 SIGNED_CERTFILE2 = data_file("keycert4.pem") 56 SIGNED_CERTFILE2 = data_file("keycert4.pem")
54 SIGNING_CA = data_file("pycacert.pem") 57 SIGNING_CA = data_file("pycacert.pem")
55 58
56 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") 59 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
57 60
58 EMPTYCERT = data_file("nullcert.pem") 61 EMPTYCERT = data_file("nullcert.pem")
59 BADCERT = data_file("badcert.pem") 62 BADCERT = data_file("badcert.pem")
60 WRONGCERT = data_file("XXXnonexisting.pem") 63 WRONGCERT = data_file("XXXnonexisting.pem")
61 BADKEY = data_file("badkey.pem") 64 BADKEY = data_file("badkey.pem")
62 NOKIACERT = data_file("nokia.pem") 65 NOKIACERT = data_file("nokia.pem")
66 NULLBYTECERT = data_file("nullbytecert.pem")
63 67
64 DHFILE = data_file("dh512.pem") 68 DHFILE = data_file("dh512.pem")
65 BYTES_DHFILE = os.fsencode(DHFILE) 69 BYTES_DHFILE = os.fsencode(DHFILE)
66 70
67 71
68 def handle_error(prefix): 72 def handle_error(prefix):
69 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
70 if support.verbose: 74 if support.verbose:
71 sys.stdout.write(prefix + exc_format) 75 sys.stdout.write(prefix + exc_format)
72 76
73 def can_clear_options(): 77 def can_clear_options():
74 # 0.9.8m or higher 78 # 0.9.8m or higher
75 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 79 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
76 80
77 def no_sslv2_implies_sslv3_hello(): 81 def no_sslv2_implies_sslv3_hello():
78 # 0.9.7h or higher 82 # 0.9.7h or higher
79 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
80 84
85 def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
89 def asn1time(cert_time):
90 # Some versions of OpenSSL ignore seconds, see #18207
91 # 0.9.8.i
92 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
93 fmt = "%b %d %H:%M:%S %Y GMT"
94 dt = datetime.datetime.strptime(cert_time, fmt)
95 dt = dt.replace(second=0)
96 cert_time = dt.strftime(fmt)
97 # %d adds leading zero but ASN1_TIME_print() uses leading space
98 if cert_time[4] == "0":
99 cert_time = cert_time[:4] + " " + cert_time[5:]
100
101 return cert_time
81 102
82 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 103 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
83 def skip_if_broken_ubuntu_ssl(func): 104 def skip_if_broken_ubuntu_ssl(func):
84 if hasattr(ssl, 'PROTOCOL_SSLv2'): 105 if hasattr(ssl, 'PROTOCOL_SSLv2'):
85 @functools.wraps(func) 106 @functools.wraps(func)
86 def f(*args, **kwargs): 107 def f(*args, **kwargs):
87 try: 108 try:
88 ssl.SSLContext(ssl.PROTOCOL_SSLv2) 109 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
89 except ssl.SSLError: 110 except ssl.SSLError:
90 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 111 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
91 platform.linux_distribution() == ('debian', 'squeeze/sid', ' ')): 112 platform.linux_distribution() == ('debian', 'squeeze/sid', ' ')):
92 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behav iour") 113 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behav iour")
93 return func(*args, **kwargs) 114 return func(*args, **kwargs)
94 return f 115 return f
95 else: 116 else:
96 return func 117 return func
97 118
98 needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") 119 needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
99 120
100 121
101 class BasicSocketTests(unittest.TestCase): 122 class BasicSocketTests(unittest.TestCase):
102 123
103 def test_constants(self): 124 def test_constants(self):
104 #ssl.PROTOCOL_SSLv2
105 ssl.PROTOCOL_SSLv23
106 ssl.PROTOCOL_SSLv3
107 ssl.PROTOCOL_TLSv1
108 ssl.CERT_NONE 125 ssl.CERT_NONE
109 ssl.CERT_OPTIONAL 126 ssl.CERT_OPTIONAL
110 ssl.CERT_REQUIRED 127 ssl.CERT_REQUIRED
111 ssl.OP_CIPHER_SERVER_PREFERENCE 128 ssl.OP_CIPHER_SERVER_PREFERENCE
112 ssl.OP_SINGLE_DH_USE 129 ssl.OP_SINGLE_DH_USE
113 if ssl.HAS_ECDH: 130 if ssl.HAS_ECDH:
114 ssl.OP_SINGLE_ECDH_USE 131 ssl.OP_SINGLE_ECDH_USE
115 if ssl.OPENSSL_VERSION_INFO >= (1, 0): 132 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
116 ssl.OP_NO_COMPRESSION 133 ssl.OP_NO_COMPRESSION
117 self.assertIn(ssl.HAS_SNI, {True, False}) 134 self.assertIn(ssl.HAS_SNI, {True, False})
118 self.assertIn(ssl.HAS_ECDH, {True, False}) 135 self.assertIn(ssl.HAS_ECDH, {True, False})
119 136
120 def test_random(self): 137 def test_random(self):
121 v = ssl.RAND_status() 138 v = ssl.RAND_status()
122 if support.verbose: 139 if support.verbose:
123 sys.stdout.write("\n RAND_status is %d (%s)\n" 140 sys.stdout.write("\n RAND_status is %d (%s)\n"
124 % (v, (v and "sufficient randomness") or 141 % (v, (v and "sufficient randomness") or
125 "insufficient randomness")) 142 "insufficient randomness"))
126 143
127 data, is_cryptographic = ssl.RAND_pseudo_bytes(16) 144 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
128 self.assertEqual(len(data), 16) 145 self.assertEqual(len(data), 16)
129 self.assertEqual(is_cryptographic, v == 1) 146 self.assertEqual(is_cryptographic, v == 1)
130 if v: 147 if v:
131 data = ssl.RAND_bytes(16) 148 data = ssl.RAND_bytes(16)
132 self.assertEqual(len(data), 16) 149 self.assertEqual(len(data), 16)
133 else: 150 else:
134 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16) 151 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
135 152
153 # negative num is invalid
154 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
155 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
156
136 self.assertRaises(TypeError, ssl.RAND_egd, 1) 157 self.assertRaises(TypeError, ssl.RAND_egd, 1)
137 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 158 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
138 ssl.RAND_add("this is a random string", 75.0) 159 ssl.RAND_add("this is a random string", 75.0)
160
161 @unittest.skipUnless(os.name == 'posix', 'requires posix')
162 def test_random_fork(self):
163 status = ssl.RAND_status()
164 if not status:
165 self.fail("OpenSSL's PRNG has insufficient randomness")
166
167 rfd, wfd = os.pipe()
168 pid = os.fork()
169 if pid == 0:
170 try:
171 os.close(rfd)
172 child_random = ssl.RAND_pseudo_bytes(16)[0]
173 self.assertEqual(len(child_random), 16)
174 os.write(wfd, child_random)
175 os.close(wfd)
176 except BaseException:
177 os._exit(1)
178 else:
179 os._exit(0)
180 else:
181 os.close(wfd)
182 self.addCleanup(os.close, rfd)
183 _, status = os.waitpid(pid, 0)
184 self.assertEqual(status, 0)
185
186 child_random = os.read(rfd, 16)
187 self.assertEqual(len(child_random), 16)
188 parent_random = ssl.RAND_pseudo_bytes(16)[0]
189 self.assertEqual(len(parent_random), 16)
190
191 self.assertNotEqual(child_random, parent_random)
139 192
140 def test_parse_cert(self): 193 def test_parse_cert(self):
141 # note that this uses an 'unofficial' function in _ssl.c, 194 # note that this uses an 'unofficial' function in _ssl.c,
142 # provided solely for this test, to exercise the certificate 195 # provided solely for this test, to exercise the certificate
143 # parsing code 196 # parsing code
144 p = ssl._ssl._test_decode_cert(CERTFILE) 197 p = ssl._ssl._test_decode_cert(CERTFILE)
145 if support.verbose: 198 if support.verbose:
146 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 199 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
147 self.assertEqual(p['issuer'], 200 self.assertEqual(p['issuer'],
148 ((('countryName', 'XY'),), 201 ((('countryName', 'XY'),),
149 (('localityName', 'Castle Anthrax'),), 202 (('localityName', 'Castle Anthrax'),),
150 (('organizationName', 'Python Software Foundation'),), 203 (('organizationName', 'Python Software Foundation'),),
151 (('commonName', 'localhost'),)) 204 (('commonName', 'localhost'),))
152 ) 205 )
153 # Note the next three asserts will fail if the keys are regenerated 206 # Note the next three asserts will fail if the keys are regenerated
154 self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT') 207 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
155 self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT') 208 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
156 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') 209 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
157 self.assertEqual(p['subject'], 210 self.assertEqual(p['subject'],
158 ((('countryName', 'XY'),), 211 ((('countryName', 'XY'),),
159 (('localityName', 'Castle Anthrax'),), 212 (('localityName', 'Castle Anthrax'),),
160 (('organizationName', 'Python Software Foundation'),), 213 (('organizationName', 'Python Software Foundation'),),
161 (('commonName', 'localhost'),)) 214 (('commonName', 'localhost'),))
162 ) 215 )
163 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 216 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
164 # Issue #13034: the subjectAltName in some certificates 217 # Issue #13034: the subjectAltName in some certificates
165 # (notably projects.developer.nokia.com:443) wasn't parsed 218 # (notably projects.developer.nokia.com:443) wasn't parsed
166 p = ssl._ssl._test_decode_cert(NOKIACERT) 219 p = ssl._ssl._test_decode_cert(NOKIACERT)
167 if support.verbose: 220 if support.verbose:
168 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 221 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
169 self.assertEqual(p['subjectAltName'], 222 self.assertEqual(p['subjectAltName'],
170 (('DNS', 'projects.developer.nokia.com'), 223 (('DNS', 'projects.developer.nokia.com'),
171 ('DNS', 'projects.forum.nokia.com')) 224 ('DNS', 'projects.forum.nokia.com'))
172 ) 225 )
226 # extra OCSP and AIA fields
227 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
228 self.assertEqual(p['caIssuers'],
229 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
230 self.assertEqual(p['crlDistributionPoints'],
231 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
232
233 def test_parse_cert_CVE_2013_4238(self):
234 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
235 if support.verbose:
236 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
237 subject = ((('countryName', 'US'),),
238 (('stateOrProvinceName', 'Oregon'),),
239 (('localityName', 'Beaverton'),),
240 (('organizationName', 'Python Software Foundation'),),
241 (('organizationalUnitName', 'Python Core Development'),),
242 (('commonName', 'null.python.org\x00example.org'),),
243 (('emailAddress', 'python-dev@python.org'),))
244 self.assertEqual(p['subject'], subject)
245 self.assertEqual(p['issuer'], subject)
246 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
247 san = (('DNS', 'altnull.python.org\x00example.com'),
248 ('email', 'null@python.org\x00user@example.org'),
249 ('URI', 'http://null.python.org\x00http://example.org'),
250 ('IP Address', '192.0.2.1'),
251 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
252 else:
253 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
254 san = (('DNS', 'altnull.python.org\x00example.com'),
255 ('email', 'null@python.org\x00user@example.org'),
256 ('URI', 'http://null.python.org\x00http://example.org'),
257 ('IP Address', '192.0.2.1'),
258 ('IP Address', '<invalid>'))
259
260 self.assertEqual(p['subjectAltName'], san)
173 261
174 def test_DER_to_PEM(self): 262 def test_DER_to_PEM(self):
175 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: 263 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
176 pem = f.read() 264 pem = f.read()
177 d1 = ssl.PEM_cert_to_DER_cert(pem) 265 d1 = ssl.PEM_cert_to_DER_cert(pem)
178 p2 = ssl.DER_cert_to_PEM_cert(d1) 266 p2 = ssl.DER_cert_to_PEM_cert(d1)
179 d2 = ssl.PEM_cert_to_DER_cert(p2) 267 d2 = ssl.PEM_cert_to_DER_cert(p2)
180 self.assertEqual(d1, d2) 268 self.assertEqual(d1, d2)
181 if not p2.startswith(ssl.PEM_HEADER + '\n'): 269 if not p2.startswith(ssl.PEM_HEADER + '\n'):
182 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 270 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
284 fail(cert, 'example.org') 372 fail(cert, 'example.org')
285 fail(cert, 'exampleXcom') 373 fail(cert, 'exampleXcom')
286 374
287 cert = {'subject': ((('commonName', '*.a.com'),),)} 375 cert = {'subject': ((('commonName', '*.a.com'),),)}
288 ok(cert, 'foo.a.com') 376 ok(cert, 'foo.a.com')
289 fail(cert, 'bar.foo.a.com') 377 fail(cert, 'bar.foo.a.com')
290 fail(cert, 'a.com') 378 fail(cert, 'a.com')
291 fail(cert, 'Xa.com') 379 fail(cert, 'Xa.com')
292 fail(cert, '.a.com') 380 fail(cert, '.a.com')
293 381
294 cert = {'subject': ((('commonName', 'a.*.com'),),)} 382 # only match one left-most wildcard
295 ok(cert, 'a.foo.com')
296 fail(cert, 'a..com')
297 fail(cert, 'a.com')
298
299 cert = {'subject': ((('commonName', 'f*.com'),),)} 383 cert = {'subject': ((('commonName', 'f*.com'),),)}
300 ok(cert, 'foo.com') 384 ok(cert, 'foo.com')
301 ok(cert, 'f.com') 385 ok(cert, 'f.com')
302 fail(cert, 'bar.com') 386 fail(cert, 'bar.com')
303 fail(cert, 'foo.a.com') 387 fail(cert, 'foo.a.com')
304 fail(cert, 'bar.foo.com') 388 fail(cert, 'bar.foo.com')
389
390 # NULL bytes are bad, CVE-2013-4073
391 cert = {'subject': ((('commonName',
392 'null.python.org\x00example.org'),),)}
393 ok(cert, 'null.python.org\x00example.org') # or raise an error?
394 fail(cert, 'example.org')
395 fail(cert, 'null.python.org')
396
397 # error cases with wildcards
398 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
399 fail(cert, 'bar.foo.a.com')
400 fail(cert, 'a.com')
401 fail(cert, 'Xa.com')
402 fail(cert, '.a.com')
403
404 cert = {'subject': ((('commonName', 'a.*.com'),),)}
405 fail(cert, 'a.foo.com')
406 fail(cert, 'a..com')
407 fail(cert, 'a.com')
408
409 # wildcard doesn't match IDNA prefix 'xn--'
410 idna = 'püthon.python.org'.encode("idna").decode("ascii")
411 cert = {'subject': ((('commonName', idna),),)}
412 ok(cert, idna)
413 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
414 fail(cert, idna)
415 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
416 fail(cert, idna)
417
418 # wildcard in first fragment and IDNA A-labels in sequent fragments
419 # are supported.
420 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
421 cert = {'subject': ((('commonName', idna),),)}
422 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
423 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
424 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
425 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
305 426
306 # Slightly fake real-world example 427 # Slightly fake real-world example
307 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', 428 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
308 'subject': ((('commonName', 'linuxfrz.org'),),), 429 'subject': ((('commonName', 'linuxfrz.org'),),),
309 'subjectAltName': (('DNS', 'linuxfr.org'), 430 'subjectAltName': (('DNS', 'linuxfr.org'),
310 ('DNS', 'linuxfr.com'), 431 ('DNS', 'linuxfr.com'),
311 ('othername', '<unsupported>'))} 432 ('othername', '<unsupported>'))}
312 ok(cert, 'linuxfr.org') 433 ok(cert, 'linuxfr.org')
313 ok(cert, 'linuxfr.com') 434 ok(cert, 'linuxfr.com')
314 # Not a "DNS" entry 435 # Not a "DNS" entry
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 (('stateOrProvinceName', 'California'),), 472 (('stateOrProvinceName', 'California'),),
352 (('localityName', 'Mountain View'),), 473 (('localityName', 'Mountain View'),),
353 (('organizationName', 'Google Inc'),)), 474 (('organizationName', 'Google Inc'),)),
354 'subjectAltName': (('othername', 'blabla'),)} 475 'subjectAltName': (('othername', 'blabla'),)}
355 fail(cert, 'google.com') 476 fail(cert, 'google.com')
356 477
357 # Empty cert / no cert 478 # Empty cert / no cert
358 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') 479 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
359 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') 480 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
360 481
482 # Issue #17980: avoid denials of service by refusing more than one
483 # wildcard per fragment.
484 cert = {'subject': ((('commonName', 'a*b.com'),),)}
485 ok(cert, 'axxb.com')
486 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
487 fail(cert, 'axxb.com')
488 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
489 with self.assertRaises(ssl.CertificateError) as cm:
490 ssl.match_hostname(cert, 'axxbxxc.com')
491 self.assertIn("too many wildcards", str(cm.exception))
492
361 def test_server_side(self): 493 def test_server_side(self):
362 # server_hostname doesn't work for server sockets 494 # server_hostname doesn't work for server sockets
363 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 495 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
364 with socket.socket() as sock: 496 with socket.socket() as sock:
365 self.assertRaises(ValueError, ctx.wrap_socket, sock, True, 497 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
366 server_hostname="some.hostname") 498 server_hostname="some.hostname")
367 499
368 def test_unknown_channel_binding(self): 500 def test_unknown_channel_binding(self):
369 # should raise ValueError for unknown type 501 # should raise ValueError for unknown type
370 s = socket.socket(socket.AF_INET) 502 s = socket.socket(socket.AF_INET)
(...skipping 14 matching lines...) Expand all
385 self.assertIsNone(ss.get_channel_binding("tls-unique")) 517 self.assertIsNone(ss.get_channel_binding("tls-unique"))
386 518
387 def test_dealloc_warn(self): 519 def test_dealloc_warn(self):
388 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 520 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
389 r = repr(ss) 521 r = repr(ss)
390 with self.assertWarns(ResourceWarning) as cm: 522 with self.assertWarns(ResourceWarning) as cm:
391 ss = None 523 ss = None
392 support.gc_collect() 524 support.gc_collect()
393 self.assertIn(r, str(cm.warning.args[0])) 525 self.assertIn(r, str(cm.warning.args[0]))
394 526
527 def test_get_default_verify_paths(self):
528 paths = ssl.get_default_verify_paths()
529 self.assertEqual(len(paths), 6)
530 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
531
532 with support.EnvironmentVarGuard() as env:
533 env["SSL_CERT_DIR"] = CAPATH
534 env["SSL_CERT_FILE"] = CERTFILE
535 paths = ssl.get_default_verify_paths()
536 self.assertEqual(paths.cafile, CERTFILE)
537 self.assertEqual(paths.capath, CAPATH)
538
539 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
540 def test_enum_certificates(self):
541 self.assertTrue(ssl.enum_certificates("CA"))
542 self.assertTrue(ssl.enum_certificates("ROOT"))
543
544 self.assertRaises(TypeError, ssl.enum_certificates)
545 self.assertRaises(WindowsError, ssl.enum_certificates, "")
546
547 trust_oids = set()
548 for storename in ("CA", "ROOT"):
549 store = ssl.enum_certificates(storename)
550 self.assertIsInstance(store, list)
551 for element in store:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 3)
554 cert, enc, trust = element
555 self.assertIsInstance(cert, bytes)
556 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
557 self.assertIsInstance(trust, (set, bool))
558 if isinstance(trust, set):
559 trust_oids.update(trust)
560
561 serverAuth = "1.3.6.1.5.5.7.3.1"
562 self.assertIn(serverAuth, trust_oids)
563
564 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
565 def test_enum_crls(self):
566 self.assertTrue(ssl.enum_crls("CA"))
567 self.assertRaises(TypeError, ssl.enum_crls)
568 self.assertRaises(WindowsError, ssl.enum_crls, "")
569
570 crls = ssl.enum_crls("CA")
571 self.assertIsInstance(crls, list)
572 for element in crls:
573 self.assertIsInstance(element, tuple)
574 self.assertEqual(len(element), 2)
575 self.assertIsInstance(element[0], bytes)
576 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
577
578
579 def test_asn1object(self):
580 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
581 '1.3.6.1.5.5.7.3.1')
582
583 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
584 self.assertEqual(val, expected)
585 self.assertEqual(val.nid, 129)
586 self.assertEqual(val.shortname, 'serverAuth')
587 self.assertEqual(val.longname, 'TLS Web Server Authentication')
588 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
589 self.assertIsInstance(val, ssl._ASN1Object)
590 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
591
592 val = ssl._ASN1Object.fromnid(129)
593 self.assertEqual(val, expected)
594 self.assertIsInstance(val, ssl._ASN1Object)
595 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
596 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
597 ssl._ASN1Object.fromnid(100000)
598 for i in range(1000):
599 try:
600 obj = ssl._ASN1Object.fromnid(i)
601 except ValueError:
602 pass
603 else:
604 self.assertIsInstance(obj.nid, int)
605 self.assertIsInstance(obj.shortname, str)
606 self.assertIsInstance(obj.longname, str)
607 self.assertIsInstance(obj.oid, (str, type(None)))
608
609 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
610 self.assertEqual(val, expected)
611 self.assertIsInstance(val, ssl._ASN1Object)
612 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
613 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
614 expected)
615 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
616 ssl._ASN1Object.fromname('serverauth')
617
618 def test_purpose_enum(self):
619 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
620 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
621 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
622 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
623 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
624 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
625 '1.3.6.1.5.5.7.3.1')
626
627 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
628 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
629 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
630 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
631 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
632 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
633 '1.3.6.1.5.5.7.3.2')
634
635 def test_unsupported_dtls(self):
636 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
637 self.addCleanup(s.close)
638 with self.assertRaises(NotImplementedError) as cx:
639 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
640 self.assertEqual(str(cx.exception), "only stream sockets are supported")
641 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
642 with self.assertRaises(NotImplementedError) as cx:
643 ctx.wrap_socket(s)
644 self.assertEqual(str(cx.exception), "only stream sockets are supported")
645
646
395 class ContextTests(unittest.TestCase): 647 class ContextTests(unittest.TestCase):
396 648
397 @skip_if_broken_ubuntu_ssl 649 @skip_if_broken_ubuntu_ssl
398 def test_constructor(self): 650 def test_constructor(self):
399 if hasattr(ssl, 'PROTOCOL_SSLv2'): 651 for protocol in PROTOCOLS:
400 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2) 652 ssl.SSLContext(protocol)
401 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
402 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3)
403 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
404 self.assertRaises(TypeError, ssl.SSLContext) 653 self.assertRaises(TypeError, ssl.SSLContext)
405 self.assertRaises(ValueError, ssl.SSLContext, -1) 654 self.assertRaises(ValueError, ssl.SSLContext, -1)
406 self.assertRaises(ValueError, ssl.SSLContext, 42) 655 self.assertRaises(ValueError, ssl.SSLContext, 42)
407 656
408 @skip_if_broken_ubuntu_ssl 657 @skip_if_broken_ubuntu_ssl
409 def test_protocol(self): 658 def test_protocol(self):
410 for proto in PROTOCOLS: 659 for proto in PROTOCOLS:
411 ctx = ssl.SSLContext(proto) 660 ctx = ssl.SSLContext(proto)
412 self.assertEqual(ctx.protocol, proto) 661 self.assertEqual(ctx.protocol, proto)
413 662
414 def test_ciphers(self): 663 def test_ciphers(self):
415 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 664 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
416 ctx.set_ciphers("ALL") 665 ctx.set_ciphers("ALL")
417 ctx.set_ciphers("DEFAULT") 666 ctx.set_ciphers("DEFAULT")
418 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): 667 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
419 ctx.set_ciphers("^$:,;?*'dorothyx") 668 ctx.set_ciphers("^$:,;?*'dorothyx")
420 669
421 @skip_if_broken_ubuntu_ssl 670 @skip_if_broken_ubuntu_ssl
422 def test_options(self): 671 def test_options(self):
423 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 672 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
424 # OP_ALL is the default value 673 # OP_ALL | OP_NO_SSLv2 is the default value
425 self.assertEqual(ssl.OP_ALL, ctx.options)
426 ctx.options |= ssl.OP_NO_SSLv2
427 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2, 674 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
428 ctx.options) 675 ctx.options)
429 ctx.options |= ssl.OP_NO_SSLv3 676 ctx.options |= ssl.OP_NO_SSLv3
430 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3, 677 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
431 ctx.options) 678 ctx.options)
432 if can_clear_options(): 679 if can_clear_options():
433 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1 680 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
434 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3, 681 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
435 ctx.options) 682 ctx.options)
436 ctx.options = 0 683 ctx.options = 0
437 self.assertEqual(0, ctx.options) 684 self.assertEqual(0, ctx.options)
438 else: 685 else:
439 with self.assertRaises(ValueError): 686 with self.assertRaises(ValueError):
440 ctx.options = 0 687 ctx.options = 0
441 688
442 def test_verify(self): 689 def test_verify_mode(self):
443 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 690 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
444 # Default value 691 # Default value
445 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 692 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
446 ctx.verify_mode = ssl.CERT_OPTIONAL 693 ctx.verify_mode = ssl.CERT_OPTIONAL
447 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 694 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
448 ctx.verify_mode = ssl.CERT_REQUIRED 695 ctx.verify_mode = ssl.CERT_REQUIRED
449 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 696 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
450 ctx.verify_mode = ssl.CERT_NONE 697 ctx.verify_mode = ssl.CERT_NONE
451 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 698 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
452 with self.assertRaises(TypeError): 699 with self.assertRaises(TypeError):
453 ctx.verify_mode = None 700 ctx.verify_mode = None
454 with self.assertRaises(ValueError): 701 with self.assertRaises(ValueError):
455 ctx.verify_mode = 42 702 ctx.verify_mode = 42
703
704 @unittest.skipUnless(have_verify_flags(),
705 "verify_flags need OpenSSL > 0.9.8")
706 def test_verify_flags(self):
707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
708 # default value by OpenSSL
709 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
710 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
711 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
712 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
713 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
714 ctx.verify_flags = ssl.VERIFY_DEFAULT
715 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
716 # supports any value
717 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
718 self.assertEqual(ctx.verify_flags,
719 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
720 with self.assertRaises(TypeError):
721 ctx.verify_flags = None
456 722
457 def test_load_cert_chain(self): 723 def test_load_cert_chain(self):
458 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 724 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
459 # Combined key and cert in a single file 725 # Combined key and cert in a single file
460 ctx.load_cert_chain(CERTFILE) 726 ctx.load_cert_chain(CERTFILE)
461 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 727 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
462 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 728 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
463 with self.assertRaises(OSError) as cm: 729 with self.assertRaises(OSError) as cm:
464 ctx.load_cert_chain(WRONGCERT) 730 ctx.load_cert_chain(WRONGCERT)
465 self.assertEqual(cm.exception.errno, errno.ENOENT) 731 self.assertEqual(cm.exception.errno, errno.ENOENT)
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
537 # Make sure the password function isn't called if it isn't needed 803 # Make sure the password function isn't called if it isn't needed
538 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 804 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
539 805
540 def test_load_verify_locations(self): 806 def test_load_verify_locations(self):
541 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 807 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
542 ctx.load_verify_locations(CERTFILE) 808 ctx.load_verify_locations(CERTFILE)
543 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 809 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
544 ctx.load_verify_locations(BYTES_CERTFILE) 810 ctx.load_verify_locations(BYTES_CERTFILE)
545 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 811 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
546 self.assertRaises(TypeError, ctx.load_verify_locations) 812 self.assertRaises(TypeError, ctx.load_verify_locations)
547 self.assertRaises(TypeError, ctx.load_verify_locations, None, None) 813 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None )
548 with self.assertRaises(OSError) as cm: 814 with self.assertRaises(OSError) as cm:
549 ctx.load_verify_locations(WRONGCERT) 815 ctx.load_verify_locations(WRONGCERT)
550 self.assertEqual(cm.exception.errno, errno.ENOENT) 816 self.assertEqual(cm.exception.errno, errno.ENOENT)
551 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): 817 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
552 ctx.load_verify_locations(BADCERT) 818 ctx.load_verify_locations(BADCERT)
553 ctx.load_verify_locations(CERTFILE, CAPATH) 819 ctx.load_verify_locations(CERTFILE, CAPATH)
554 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 820 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
555 821
556 # Issue #10989: crash if the second argument type is invalid 822 # Issue #10989: crash if the second argument type is invalid
557 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 823 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
824
825 def test_load_verify_cadata(self):
826 # test cadata
827 with open(CAFILE_CACERT) as f:
828 cacert_pem = f.read()
829 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
830 with open(CAFILE_NEURONIO) as f:
831 neuronio_pem = f.read()
832 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
833
834 # test PEM
835 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
836 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
837 ctx.load_verify_locations(cadata=cacert_pem)
838 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
839 ctx.load_verify_locations(cadata=neuronio_pem)
840 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
841 # cert already in hash table
842 ctx.load_verify_locations(cadata=neuronio_pem)
843 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
844
845 # combined
846 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
847 combined = "\n".join((cacert_pem, neuronio_pem))
848 ctx.load_verify_locations(cadata=combined)
849 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
850
851 # with junk around the certs
852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
853 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
854 neuronio_pem, "tail"]
855 ctx.load_verify_locations(cadata="\n".join(combined))
856 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
857
858 # test DER
859 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
860 ctx.load_verify_locations(cadata=cacert_der)
861 ctx.load_verify_locations(cadata=neuronio_der)
862 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
863 # cert already in hash table
864 ctx.load_verify_locations(cadata=cacert_der)
865 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
866
867 # combined
868 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
869 combined = b"".join((cacert_der, neuronio_der))
870 ctx.load_verify_locations(cadata=combined)
871 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
872
873 # error cases
874 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
875 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
876
877 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
878 ctx.load_verify_locations(cadata="broken")
879 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
880 ctx.load_verify_locations(cadata=b"broken")
881
558 882
559 def test_load_dh_params(self): 883 def test_load_dh_params(self):
560 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
561 ctx.load_dh_params(DHFILE) 885 ctx.load_dh_params(DHFILE)
562 if os.name != 'nt': 886 if os.name != 'nt':
563 ctx.load_dh_params(BYTES_DHFILE) 887 ctx.load_dh_params(BYTES_DHFILE)
564 self.assertRaises(TypeError, ctx.load_dh_params) 888 self.assertRaises(TypeError, ctx.load_dh_params)
565 self.assertRaises(TypeError, ctx.load_dh_params, None) 889 self.assertRaises(TypeError, ctx.load_dh_params, None)
566 with self.assertRaises(FileNotFoundError) as cm: 890 with self.assertRaises(FileNotFoundError) as cm:
567 ctx.load_dh_params(WRONGCERT) 891 ctx.load_dh_params(WRONGCERT)
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
623 # Reference cycles through the servername callback are detected 947 # Reference cycles through the servername callback are detected
624 # and cleared. 948 # and cleared.
625 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 949 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
626 def dummycallback(sock, servername, ctx, cycle=ctx): 950 def dummycallback(sock, servername, ctx, cycle=ctx):
627 pass 951 pass
628 ctx.set_servername_callback(dummycallback) 952 ctx.set_servername_callback(dummycallback)
629 wr = weakref.ref(ctx) 953 wr = weakref.ref(ctx)
630 del ctx, dummycallback 954 del ctx, dummycallback
631 gc.collect() 955 gc.collect()
632 self.assertIs(wr(), None) 956 self.assertIs(wr(), None)
957
958 def test_cert_store_stats(self):
959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
960 self.assertEqual(ctx.cert_store_stats(),
961 {'x509_ca': 0, 'crl': 0, 'x509': 0})
962 ctx.load_cert_chain(CERTFILE)
963 self.assertEqual(ctx.cert_store_stats(),
964 {'x509_ca': 0, 'crl': 0, 'x509': 0})
965 ctx.load_verify_locations(CERTFILE)
966 self.assertEqual(ctx.cert_store_stats(),
967 {'x509_ca': 0, 'crl': 0, 'x509': 1})
968 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
969 self.assertEqual(ctx.cert_store_stats(),
970 {'x509_ca': 1, 'crl': 0, 'x509': 2})
971
972 def test_get_ca_certs(self):
973 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
974 self.assertEqual(ctx.get_ca_certs(), [])
975 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
976 ctx.load_verify_locations(CERTFILE)
977 self.assertEqual(ctx.get_ca_certs(), [])
978 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
979 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
980 self.assertEqual(ctx.get_ca_certs(),
981 [{'issuer': ((('organizationName', 'Root CA'),),
982 (('organizationalUnitName', 'http://www.cacert.org'),),
983 (('commonName', 'CA Cert Signing Authority'),),
984 (('emailAddress', 'support@cacert.org'),)),
985 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
986 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
987 'serialNumber': '00',
988 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
989 'subject': ((('organizationName', 'Root CA'),),
990 (('organizationalUnitName', 'http://www.cacert.org'),) ,
991 (('commonName', 'CA Cert Signing Authority'),),
992 (('emailAddress', 'support@cacert.org'),)),
993 'version': 3}])
994
995 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
996 pem = f.read()
997 der = ssl.PEM_cert_to_DER_cert(pem)
998 self.assertEqual(ctx.get_ca_certs(True), [der])
999
1000 def test_load_default_certs(self):
1001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1002 ctx.load_default_certs()
1003
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1005 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1006 ctx.load_default_certs()
1007
1008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1009 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1010
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 self.assertRaises(TypeError, ctx.load_default_certs, None)
1013 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1014
1015 def test_create_default_context(self):
1016 ctx = ssl.create_default_context()
1017 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1019 self.assertTrue(ctx.check_hostname)
1020 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1021
1022 with open(SIGNING_CA) as f:
1023 cadata = f.read()
1024 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1025 cadata=cadata)
1026 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1028 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1029
1030 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1031 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1033 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1034
1035 def test__create_stdlib_context(self):
1036 ctx = ssl._create_stdlib_context()
1037 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1038 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1039 self.assertFalse(ctx.check_hostname)
1040 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1041
1042 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1043 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1045 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1046
1047 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1048 cert_reqs=ssl.CERT_REQUIRED,
1049 check_hostname=True)
1050 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 self.assertTrue(ctx.check_hostname)
1053 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1054
1055 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1056 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1057 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1058 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1059
1060 def test_check_hostname(self):
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1062 self.assertFalse(ctx.check_hostname)
1063
1064 # Requires CERT_REQUIRED or CERT_OPTIONAL
1065 with self.assertRaises(ValueError):
1066 ctx.check_hostname = True
1067 ctx.verify_mode = ssl.CERT_REQUIRED
1068 self.assertFalse(ctx.check_hostname)
1069 ctx.check_hostname = True
1070 self.assertTrue(ctx.check_hostname)
1071
1072 ctx.verify_mode = ssl.CERT_OPTIONAL
1073 ctx.check_hostname = True
1074 self.assertTrue(ctx.check_hostname)
1075
1076 # Cannot set CERT_NONE with check_hostname enabled
1077 with self.assertRaises(ValueError):
1078 ctx.verify_mode = ssl.CERT_NONE
1079 ctx.check_hostname = False
1080 self.assertFalse(ctx.check_hostname)
633 1081
634 1082
635 class SSLErrorTests(unittest.TestCase): 1083 class SSLErrorTests(unittest.TestCase):
636 1084
637 def test_str(self): 1085 def test_str(self):
638 # The str() of a SSLError doesn't include the errno 1086 # The str() of a SSLError doesn't include the errno
639 e = ssl.SSLError(1, "foo") 1087 e = ssl.SSLError(1, "foo")
640 self.assertEqual(str(e), "foo") 1088 self.assertEqual(str(e), "foo")
641 self.assertEqual(e.errno, 1) 1089 self.assertEqual(e.errno, 1)
642 # Same for a subclass 1090 # Same for a subclass
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
759 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1207 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
760 finally: 1208 finally:
761 s.close() 1209 s.close()
762 1210
763 def test_connect_ex_error(self): 1211 def test_connect_ex_error(self):
764 with support.transient_internet("svn.python.org"): 1212 with support.transient_internet("svn.python.org"):
765 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1213 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
766 cert_reqs=ssl.CERT_REQUIRED, 1214 cert_reqs=ssl.CERT_REQUIRED,
767 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 1215 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
768 try: 1216 try:
769 self.assertEqual(errno.ECONNREFUSED, 1217 rc = s.connect_ex(("svn.python.org", 444))
770 s.connect_ex(("svn.python.org", 444))) 1218 # Issue #19919: Windows machines or VMs hosted on Windows
1219 # machines sometimes return EWOULDBLOCK.
1220 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
771 finally: 1221 finally:
772 s.close() 1222 s.close()
773 1223
774 def test_connect_with_context(self): 1224 def test_connect_with_context(self):
775 with support.transient_internet("svn.python.org"): 1225 with support.transient_internet("svn.python.org"):
776 # Same as test_connect, but with a separately created context 1226 # Same as test_connect, but with a separately created context
777 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1227 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
778 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1228 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
779 s.connect(("svn.python.org", 443)) 1229 s.connect(("svn.python.org", 443))
780 try: 1230 try:
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
827 ctx.verify_mode = ssl.CERT_REQUIRED 1277 ctx.verify_mode = ssl.CERT_REQUIRED
828 ctx.load_verify_locations(capath=BYTES_CAPATH) 1278 ctx.load_verify_locations(capath=BYTES_CAPATH)
829 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1279 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
830 s.connect(("svn.python.org", 443)) 1280 s.connect(("svn.python.org", 443))
831 try: 1281 try:
832 cert = s.getpeercert() 1282 cert = s.getpeercert()
833 self.assertTrue(cert) 1283 self.assertTrue(cert)
834 finally: 1284 finally:
835 s.close() 1285 s.close()
836 1286
1287 def test_connect_cadata(self):
1288 with open(CAFILE_CACERT) as f:
1289 pem = f.read()
1290 der = ssl.PEM_cert_to_DER_cert(pem)
1291 with support.transient_internet("svn.python.org"):
1292 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1293 ctx.verify_mode = ssl.CERT_REQUIRED
1294 ctx.load_verify_locations(cadata=pem)
1295 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1296 s.connect(("svn.python.org", 443))
1297 cert = s.getpeercert()
1298 self.assertTrue(cert)
1299
1300 # same with DER
1301 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1302 ctx.verify_mode = ssl.CERT_REQUIRED
1303 ctx.load_verify_locations(cadata=der)
1304 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1305 s.connect(("svn.python.org", 443))
1306 cert = s.getpeercert()
1307 self.assertTrue(cert)
1308
837 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s") 1309 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s")
838 def test_makefile_close(self): 1310 def test_makefile_close(self):
839 # Issue #5238: creating a file-like object with makefile() shouldn't 1311 # Issue #5238: creating a file-like object with makefile() shouldn't
840 # delay closing the underlying "real socket" (here tested with its 1312 # delay closing the underlying "real socket" (here tested with its
841 # file descriptor, hence skipping the test under Windows). 1313 # file descriptor, hence skipping the test under Windows).
842 with support.transient_internet("svn.python.org"): 1314 with support.transient_internet("svn.python.org"):
843 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1315 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
844 ss.connect(("svn.python.org", 443)) 1316 ss.connect(("svn.python.org", 443))
845 fd = ss.fileno() 1317 fd = ss.fileno()
846 f = ss.makefile() 1318 f = ss.makefile()
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
939 try: 1411 try:
940 s.connect(remote) 1412 s.connect(remote)
941 if support.verbose: 1413 if support.verbose:
942 sys.stdout.write("\nCipher with %r is %r\n" % 1414 sys.stdout.write("\nCipher with %r is %r\n" %
943 (remote, s.cipher())) 1415 (remote, s.cipher()))
944 sys.stdout.write("Certificate is:\n%s\n" % 1416 sys.stdout.write("Certificate is:\n%s\n" %
945 pprint.pformat(s.getpeercert())) 1417 pprint.pformat(s.getpeercert()))
946 finally: 1418 finally:
947 s.close() 1419 s.close()
948 1420
1421 def test_get_ca_certs_capath(self):
1422 # capath certs are loaded on request
1423 with support.transient_internet("svn.python.org"):
1424 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1425 ctx.verify_mode = ssl.CERT_REQUIRED
1426 ctx.load_verify_locations(capath=CAPATH)
1427 self.assertEqual(ctx.get_ca_certs(), [])
1428 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1429 s.connect(("svn.python.org", 443))
1430 try:
1431 cert = s.getpeercert()
1432 self.assertTrue(cert)
1433 finally:
1434 s.close()
1435 self.assertEqual(len(ctx.get_ca_certs()), 1)
1436
1437 @needs_sni
1438 def test_context_setget(self):
1439 # Check that the context of a connected socket can be replaced.
1440 with support.transient_internet("svn.python.org"):
1441 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1442 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1443 s = socket.socket(socket.AF_INET)
1444 with ctx1.wrap_socket(s) as ss:
1445 ss.connect(("svn.python.org", 443))
1446 self.assertIs(ss.context, ctx1)
1447 self.assertIs(ss._sslobj.context, ctx1)
1448 ss.context = ctx2
1449 self.assertIs(ss.context, ctx2)
1450 self.assertIs(ss._sslobj.context, ctx2)
949 1451
950 try: 1452 try:
951 import threading 1453 import threading
952 except ImportError: 1454 except ImportError:
953 _have_threads = False 1455 _have_threads = False
954 else: 1456 else:
955 _have_threads = True 1457 _have_threads = True
956 1458
957 from test.ssl_servers import make_https_server 1459 from test.ssl_servers import make_https_server
958 1460
(...skipping 13 matching lines...) Expand all
972 self.sock.setblocking(1) 1474 self.sock.setblocking(1)
973 self.sslconn = None 1475 self.sslconn = None
974 threading.Thread.__init__(self) 1476 threading.Thread.__init__(self)
975 self.daemon = True 1477 self.daemon = True
976 1478
977 def wrap_conn(self): 1479 def wrap_conn(self):
978 try: 1480 try:
979 self.sslconn = self.server.context.wrap_socket( 1481 self.sslconn = self.server.context.wrap_socket(
980 self.sock, server_side=True) 1482 self.sock, server_side=True)
981 self.server.selected_protocols.append(self.sslconn.selected_ npn_protocol()) 1483 self.server.selected_protocols.append(self.sslconn.selected_ npn_protocol())
982 except ssl.SSLError as e: 1484 except (ssl.SSLError, ConnectionResetError) as e:
1485 # We treat ConnectionResetError as though it were an
1486 # SSLError - OpenSSL on Ubuntu abruptly closes the
1487 # connection when asked to use an unsupported protocol.
1488 #
983 # XXX Various errors can have happened here, for example 1489 # XXX Various errors can have happened here, for example
984 # a mismatching protocol version, an invalid certificate, 1490 # a mismatching protocol version, an invalid certificate,
985 # or a low-level bug. This should be made more discriminatin g. 1491 # or a low-level bug. This should be made more discriminatin g.
986 self.server.conn_errors.append(e) 1492 self.server.conn_errors.append(e)
987 if self.server.chatty: 1493 if self.server.chatty:
988 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") 1494 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
989 self.running = False 1495 self.running = False
990 self.server.stop() 1496 self.server.stop()
991 self.close() 1497 self.close()
992 return False 1498 return False
(...skipping 353 matching lines...) Expand 10 before | Expand all | Expand 10 after
1346 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 1852 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1347 ssl.CERT_REQUIRED: "CERT_REQUIRED", 1853 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1348 }[certsreqs] 1854 }[certsreqs]
1349 if support.verbose: 1855 if support.verbose:
1350 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 1856 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
1351 sys.stdout.write(formatstr % 1857 sys.stdout.write(formatstr %
1352 (ssl.get_protocol_name(client_protocol), 1858 (ssl.get_protocol_name(client_protocol),
1353 ssl.get_protocol_name(server_protocol), 1859 ssl.get_protocol_name(server_protocol),
1354 certtype)) 1860 certtype))
1355 client_context = ssl.SSLContext(client_protocol) 1861 client_context = ssl.SSLContext(client_protocol)
1356 client_context.options = ssl.OP_ALL | client_options 1862 client_context.options |= client_options
1357 server_context = ssl.SSLContext(server_protocol) 1863 server_context = ssl.SSLContext(server_protocol)
1358 server_context.options = ssl.OP_ALL | server_options 1864 server_context.options |= server_options
1865
1866 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1867 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1868 # starting from OpenSSL 1.0.0 (see issue #8322).
1869 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1870 client_context.set_ciphers("ALL")
1871
1359 for ctx in (client_context, server_context): 1872 for ctx in (client_context, server_context):
1360 ctx.verify_mode = certsreqs 1873 ctx.verify_mode = certsreqs
1361 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
1362 # will send an SSLv3 hello (rather than SSLv2) starting from
1363 # OpenSSL 1.0.0 (see issue #8322).
1364 ctx.set_ciphers("ALL")
1365 ctx.load_cert_chain(CERTFILE) 1874 ctx.load_cert_chain(CERTFILE)
1366 ctx.load_verify_locations(CERTFILE) 1875 ctx.load_verify_locations(CERTFILE)
1367 try: 1876 try:
1368 server_params_test(client_context, server_context, 1877 server_params_test(client_context, server_context,
1369 chatty=False, connectionchatty=False) 1878 chatty=False, connectionchatty=False)
1370 # Protocol mismatch can result in either an SSLError, or a 1879 # Protocol mismatch can result in either an SSLError, or a
1371 # "Connection reset by peer" error. 1880 # "Connection reset by peer" error.
1372 except ssl.SSLError: 1881 except ssl.SSLError:
1373 if expect_success: 1882 if expect_success:
1374 raise 1883 raise
1375 except OSError as e: 1884 except OSError as e:
1376 if expect_success or e.errno != errno.ECONNRESET: 1885 if expect_success or e.errno != errno.ECONNRESET:
1377 raise 1886 raise
1378 else: 1887 else:
1379 if not expect_success: 1888 if not expect_success:
1380 raise AssertionError( 1889 raise AssertionError(
1381 "Client protocol %s succeeded with server protocol %s!" 1890 "Client protocol %s succeeded with server protocol %s!"
1382 % (ssl.get_protocol_name(client_protocol), 1891 % (ssl.get_protocol_name(client_protocol),
1383 ssl.get_protocol_name(server_protocol))) 1892 ssl.get_protocol_name(server_protocol)))
1384 1893
1385 1894
1386 class ThreadedTests(unittest.TestCase): 1895 class ThreadedTests(unittest.TestCase):
1387 1896
1388 @skip_if_broken_ubuntu_ssl 1897 @skip_if_broken_ubuntu_ssl
1389 def test_echo(self): 1898 def test_echo(self):
1390 """Basic test of an SSL client connecting to a server""" 1899 """Basic test of an SSL client connecting to a server"""
1391 if support.verbose: 1900 if support.verbose:
1392 sys.stdout.write("\n") 1901 sys.stdout.write("\n")
1393 for protocol in PROTOCOLS: 1902 for protocol in PROTOCOLS:
1394 context = ssl.SSLContext(protocol) 1903 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
1395 context.load_cert_chain(CERTFILE) 1904 context = ssl.SSLContext(protocol)
1396 server_params_test(context, context, 1905 context.load_cert_chain(CERTFILE)
1397 chatty=True, connectionchatty=True) 1906 server_params_test(context, context,
1907 chatty=True, connectionchatty=True)
1398 1908
1399 def test_getpeercert(self): 1909 def test_getpeercert(self):
1400 if support.verbose: 1910 if support.verbose:
1401 sys.stdout.write("\n") 1911 sys.stdout.write("\n")
1402 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1912 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1403 context.verify_mode = ssl.CERT_REQUIRED 1913 context.verify_mode = ssl.CERT_REQUIRED
1404 context.load_verify_locations(CERTFILE) 1914 context.load_verify_locations(CERTFILE)
1405 context.load_cert_chain(CERTFILE) 1915 context.load_cert_chain(CERTFILE)
1406 server = ThreadedEchoServer(context=context, chatty=False) 1916 server = ThreadedEchoServer(context=context, chatty=False)
1407 with server: 1917 with server:
1408 s = context.wrap_socket(socket.socket()) 1918 s = context.wrap_socket(socket.socket(),
1919 do_handshake_on_connect=False)
1409 s.connect((HOST, server.port)) 1920 s.connect((HOST, server.port))
1921 # getpeercert() raise ValueError while the handshake isn't
1922 # done.
1923 with self.assertRaises(ValueError):
1924 s.getpeercert()
1925 s.do_handshake()
1410 cert = s.getpeercert() 1926 cert = s.getpeercert()
1411 self.assertTrue(cert, "Can't get peer certificate.") 1927 self.assertTrue(cert, "Can't get peer certificate.")
1412 cipher = s.cipher() 1928 cipher = s.cipher()
1413 if support.verbose: 1929 if support.verbose:
1414 sys.stdout.write(pprint.pformat(cert) + '\n') 1930 sys.stdout.write(pprint.pformat(cert) + '\n')
1415 sys.stdout.write("Connection cipher is " + str(cipher) + '.\ n') 1931 sys.stdout.write("Connection cipher is " + str(cipher) + '.\ n')
1416 if 'subject' not in cert: 1932 if 'subject' not in cert:
1417 self.fail("No subject field in certificate: %s." % 1933 self.fail("No subject field in certificate: %s." %
1418 pprint.pformat(cert)) 1934 pprint.pformat(cert))
1419 if ((('organizationName', 'Python Software Foundation'),) 1935 if ((('organizationName', 'Python Software Foundation'),)
1420 not in cert['subject']): 1936 not in cert['subject']):
1421 self.fail( 1937 self.fail(
1422 "Missing or invalid 'organizationName' field in certific ate subject; " 1938 "Missing or invalid 'organizationName' field in certific ate subject; "
1423 "should be 'Python Software Foundation'.") 1939 "should be 'Python Software Foundation'.")
1424 self.assertIn('notBefore', cert) 1940 self.assertIn('notBefore', cert)
1425 self.assertIn('notAfter', cert) 1941 self.assertIn('notAfter', cert)
1426 before = ssl.cert_time_to_seconds(cert['notBefore']) 1942 before = ssl.cert_time_to_seconds(cert['notBefore'])
1427 after = ssl.cert_time_to_seconds(cert['notAfter']) 1943 after = ssl.cert_time_to_seconds(cert['notAfter'])
1428 self.assertLess(before, after) 1944 self.assertLess(before, after)
1429 s.close() 1945 s.close()
1430 1946
1947 @unittest.skipUnless(have_verify_flags(),
1948 "verify_flags need OpenSSL > 0.9.8")
1949 def test_crl_check(self):
1950 if support.verbose:
1951 sys.stdout.write("\n")
1952
1953 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1954 server_context.load_cert_chain(SIGNED_CERTFILE)
1955
1956 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1957 context.verify_mode = ssl.CERT_REQUIRED
1958 context.load_verify_locations(SIGNING_CA)
1959 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
1960
1961 # VERIFY_DEFAULT should pass
1962 server = ThreadedEchoServer(context=server_context, chatty=True)
1963 with server:
1964 with context.wrap_socket(socket.socket()) as s:
1965 s.connect((HOST, server.port))
1966 cert = s.getpeercert()
1967 self.assertTrue(cert, "Can't get peer certificate.")
1968
1969 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
1970 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
1971
1972 server = ThreadedEchoServer(context=server_context, chatty=True)
1973 with server:
1974 with context.wrap_socket(socket.socket()) as s:
1975 with self.assertRaisesRegex(ssl.SSLError,
1976 "certificate verify failed"):
1977 s.connect((HOST, server.port))
1978
1979 # now load a CRL file. The CRL file is signed by the CA.
1980 context.load_verify_locations(CRLFILE)
1981
1982 server = ThreadedEchoServer(context=server_context, chatty=True)
1983 with server:
1984 with context.wrap_socket(socket.socket()) as s:
1985 s.connect((HOST, server.port))
1986 cert = s.getpeercert()
1987 self.assertTrue(cert, "Can't get peer certificate.")
1988
1989 @needs_sni
1990 def test_check_hostname(self):
1991 if support.verbose:
1992 sys.stdout.write("\n")
1993
1994 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1995 server_context.load_cert_chain(SIGNED_CERTFILE)
1996
1997 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1998 context.verify_mode = ssl.CERT_REQUIRED
1999 context.check_hostname = True
2000 context.load_verify_locations(SIGNING_CA)
2001
2002 # correct hostname should verify
2003 server = ThreadedEchoServer(context=server_context, chatty=True)
2004 with server:
2005 with context.wrap_socket(socket.socket(),
2006 server_hostname="localhost") as s:
2007 s.connect((HOST, server.port))
2008 cert = s.getpeercert()
2009 self.assertTrue(cert, "Can't get peer certificate.")
2010
2011 # incorrect hostname should raise an exception
2012 server = ThreadedEchoServer(context=server_context, chatty=True)
2013 with server:
2014 with context.wrap_socket(socket.socket(),
2015 server_hostname="invalid") as s:
2016 with self.assertRaisesRegex(ssl.CertificateError,
2017 "hostname 'invalid' doesn't matc h 'localhost'"):
2018 s.connect((HOST, server.port))
2019
2020 # missing server_hostname arg should cause an exception, too
2021 server = ThreadedEchoServer(context=server_context, chatty=True)
2022 with server:
2023 with socket.socket() as s:
2024 with self.assertRaisesRegex(ValueError,
2025 "check_hostname requires server_ hostname"):
2026 context.wrap_socket(s)
2027
1431 def test_empty_cert(self): 2028 def test_empty_cert(self):
1432 """Connecting with an empty cert file""" 2029 """Connecting with an empty cert file"""
1433 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2030 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1434 "nullcert.pem")) 2031 "nullcert.pem"))
1435 def test_malformed_cert(self): 2032 def test_malformed_cert(self):
1436 """Connecting with a badly formatted certificate (syntax error)""" 2033 """Connecting with a badly formatted certificate (syntax error)"""
1437 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2034 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1438 "badcert.pem")) 2035 "badcert.pem"))
1439 def test_nonexisting_cert(self): 2036 def test_nonexisting_cert(self):
1440 """Connecting with a non-existing cert file""" 2037 """Connecting with a non-existing cert file"""
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
1489 @skip_if_broken_ubuntu_ssl 2086 @skip_if_broken_ubuntu_ssl
1490 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2087 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
1491 "OpenSSL is compiled without SSLv2 support") 2088 "OpenSSL is compiled without SSLv2 support")
1492 def test_protocol_sslv2(self): 2089 def test_protocol_sslv2(self):
1493 """Connecting to an SSLv2 server with various client options""" 2090 """Connecting to an SSLv2 server with various client options"""
1494 if support.verbose: 2091 if support.verbose:
1495 sys.stdout.write("\n") 2092 sys.stdout.write("\n")
1496 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2093 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1497 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL) 2094 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL)
1498 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED) 2095 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED)
1499 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 2096 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
1500 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2097 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
1501 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2098 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
1502 # SSLv23 client with specific SSL options 2099 # SSLv23 client with specific SSL options
1503 if no_sslv2_implies_sslv3_hello(): 2100 if no_sslv2_implies_sslv3_hello():
1504 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2101 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
1505 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e, 2102 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e,
1506 client_options=ssl.OP_NO_SSLv2) 2103 client_options=ssl.OP_NO_SSLv2)
1507 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2104 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1508 client_options=ssl.OP_NO_SSLv3) 2105 client_options=ssl.OP_NO_SSLv3)
1509 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2106 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1510 client_options=ssl.OP_NO_TLSv1) 2107 client_options=ssl.OP_NO_TLSv1)
1511 2108
1512 @skip_if_broken_ubuntu_ssl 2109 @skip_if_broken_ubuntu_ssl
1513 def test_protocol_sslv23(self): 2110 def test_protocol_sslv23(self):
1514 """Connecting to an SSLv23 server with various client options""" 2111 """Connecting to an SSLv23 server with various client options"""
1515 if support.verbose: 2112 if support.verbose:
1516 sys.stdout.write("\n") 2113 sys.stdout.write("\n")
1517 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2114 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1518 try: 2115 try:
1519 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2116 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
1569 if support.verbose: 2166 if support.verbose:
1570 sys.stdout.write("\n") 2167 sys.stdout.write("\n")
1571 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) 2168 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1572 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_OPTIONAL) 2169 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_OPTIONAL)
1573 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_REQUIRED) 2170 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl .CERT_REQUIRED)
1574 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2171 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1575 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False ) 2172 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False )
1576 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 2173 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1577 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, 2174 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
1578 client_options=ssl.OP_NO_TLSv1) 2175 client_options=ssl.OP_NO_TLSv1)
2176
2177 @skip_if_broken_ubuntu_ssl
2178 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2179 "TLS version 1.1 not supported.")
2180 def test_protocol_tlsv1_1(self):
2181 """Connecting to a TLSv1.1 server with various client options.
2182 Testing against older TLS versions."""
2183 if support.verbose:
2184 sys.stdout.write("\n")
2185 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True)
2186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2187 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, Fal se)
2188 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2189 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2190 client_options=ssl.OP_NO_TLSv1_1)
2191
2192 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True)
2193 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2194 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2195
2196
2197 @skip_if_broken_ubuntu_ssl
2198 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2199 "TLS version 1.2 not supported.")
2200 def test_protocol_tlsv1_2(self):
2201 """Connecting to a TLSv1.2 server with various client options.
2202 Testing against older TLS versions."""
2203 if support.verbose:
2204 sys.stdout.write("\n")
2205 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True,
2206 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2207 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2208 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2209 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, Fal se)
2210 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2211 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2212 client_options=ssl.OP_NO_TLSv1_2)
2213
2214 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True)
2215 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2216 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2217 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False )
2218 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False )
1579 2219
1580 def test_starttls(self): 2220 def test_starttls(self):
1581 """Switching from clear text to encrypted and back again.""" 2221 """Switching from clear text to encrypted and back again."""
1582 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTL S", b"msg 5", b"msg 6") 2222 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTL S", b"msg 5", b"msg 6")
1583 2223
1584 server = ThreadedEchoServer(CERTFILE, 2224 server = ThreadedEchoServer(CERTFILE,
1585 ssl_version=ssl.PROTOCOL_TLSv1, 2225 ssl_version=ssl.PROTOCOL_TLSv1,
1586 starttls_server=True, 2226 starttls_server=True,
1587 chatty=True, 2227 chatty=True,
1588 connectionchatty=True) 2228 connectionchatty=True)
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1630 conn.write(b"over\n") 2270 conn.write(b"over\n")
1631 else: 2271 else:
1632 s.send(b"over\n") 2272 s.send(b"over\n")
1633 if wrapped: 2273 if wrapped:
1634 conn.close() 2274 conn.close()
1635 else: 2275 else:
1636 s.close() 2276 s.close()
1637 2277
1638 def test_socketserver(self): 2278 def test_socketserver(self):
1639 """Using a SocketServer to create and manage SSL connections.""" 2279 """Using a SocketServer to create and manage SSL connections."""
1640 server = make_https_server(self, CERTFILE) 2280 server = make_https_server(self, certfile=CERTFILE)
1641 # try to connect 2281 # try to connect
1642 if support.verbose: 2282 if support.verbose:
1643 sys.stdout.write('\n') 2283 sys.stdout.write('\n')
1644 with open(CERTFILE, 'rb') as f: 2284 with open(CERTFILE, 'rb') as f:
1645 d1 = f.read() 2285 d1 = f.read()
1646 d2 = '' 2286 d2 = ''
1647 # now fetch the same data from the HTTPS server 2287 # now fetch the same data from the HTTPS server
1648 url = 'https://%s:%d/%s' % ( 2288 url = 'https://%s:%d/%s' % (
1649 HOST, server.port, os.path.split(CERTFILE)[1]) 2289 HOST, server.port, os.path.split(CERTFILE)[1])
1650 f = urllib.request.urlopen(url) 2290 f = urllib.request.urlopen(url)
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after
1886 client.connect((host, port)) 2526 client.connect((host, port))
1887 client_addr = client.getsockname() 2527 client_addr = client.getsockname()
1888 client.close() 2528 client.close()
1889 t.join() 2529 t.join()
1890 remote.close() 2530 remote.close()
1891 server.close() 2531 server.close()
1892 # Sanity checks. 2532 # Sanity checks.
1893 self.assertIsInstance(remote, ssl.SSLSocket) 2533 self.assertIsInstance(remote, ssl.SSLSocket)
1894 self.assertEqual(peer, client_addr) 2534 self.assertEqual(peer, client_addr)
1895 2535
2536 def test_getpeercert_enotconn(self):
2537 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2538 with context.wrap_socket(socket.socket()) as sock:
2539 with self.assertRaises(OSError) as cm:
2540 sock.getpeercert()
2541 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2542
2543 def test_do_handshake_enotconn(self):
2544 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2545 with context.wrap_socket(socket.socket()) as sock:
2546 with self.assertRaises(OSError) as cm:
2547 sock.do_handshake()
2548 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2549
1896 def test_default_ciphers(self): 2550 def test_default_ciphers(self):
1897 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2551 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1898 try: 2552 try:
1899 # Force a set of weak ciphers on our client context 2553 # Force a set of weak ciphers on our client context
1900 context.set_ciphers("DES") 2554 context.set_ciphers("DES")
1901 except ssl.SSLError: 2555 except ssl.SSLError:
1902 self.skipTest("no DES cipher available") 2556 self.skipTest("no DES cipher available")
1903 with ThreadedEchoServer(CERTFILE, 2557 with ThreadedEchoServer(CERTFILE,
1904 ssl_version=ssl.PROTOCOL_SSLv23, 2558 ssl_version=ssl.PROTOCOL_SSLv23,
1905 chatty=False) as server: 2559 chatty=False) as server:
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
2051 cert = stats['peercert'] 2705 cert = stats['peercert']
2052 self.assertIn((('commonName', name),), cert['subject']) 2706 self.assertIn((('commonName', name),), cert['subject'])
2053 2707
2054 @needs_sni 2708 @needs_sni
2055 def test_sni_callback(self): 2709 def test_sni_callback(self):
2056 calls = [] 2710 calls = []
2057 server_context, other_context, client_context = self.sni_contexts() 2711 server_context, other_context, client_context = self.sni_contexts()
2058 2712
2059 def servername_cb(ssl_sock, server_name, initial_context): 2713 def servername_cb(ssl_sock, server_name, initial_context):
2060 calls.append((server_name, initial_context)) 2714 calls.append((server_name, initial_context))
2061 ssl_sock.context = other_context 2715 if server_name is not None:
2716 ssl_sock.context = other_context
2062 server_context.set_servername_callback(servername_cb) 2717 server_context.set_servername_callback(servername_cb)
2063 2718
2064 stats = server_params_test(client_context, server_context, 2719 stats = server_params_test(client_context, server_context,
2065 chatty=True, 2720 chatty=True,
2066 sni_name='supermessage') 2721 sni_name='supermessage')
2067 # The hostname was fetched properly, and the certificate was 2722 # The hostname was fetched properly, and the certificate was
2068 # changed for the connection. 2723 # changed for the connection.
2069 self.assertEqual(calls, [("supermessage", server_context)]) 2724 self.assertEqual(calls, [("supermessage", server_context)])
2070 # CERTFILE4 was selected 2725 # CERTFILE4 was selected
2071 self.check_common_name(stats, 'fakehostname') 2726 self.check_common_name(stats, 'fakehostname')
2072 2727
2728 calls = []
2729 # The callback is called with server_name=None
2730 stats = server_params_test(client_context, server_context,
2731 chatty=True,
2732 sni_name=None)
2733 self.assertEqual(calls, [(None, server_context)])
2734 self.check_common_name(stats, 'localhost')
2735
2073 # Check disabling the callback 2736 # Check disabling the callback
2074 calls = [] 2737 calls = []
2075 server_context.set_servername_callback(None) 2738 server_context.set_servername_callback(None)
2076 2739
2077 stats = server_params_test(client_context, server_context, 2740 stats = server_params_test(client_context, server_context,
2078 chatty=True, 2741 chatty=True,
2079 sni_name='notfunny') 2742 sni_name='notfunny')
2080 # Certificate didn't change 2743 # Certificate didn't change
2081 self.check_common_name(stats, 'localhost') 2744 self.check_common_name(stats, 'localhost')
2082 self.assertEqual(calls, []) 2745 self.assertEqual(calls, [])
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
2123 return "foo" 2786 return "foo"
2124 server_context.set_servername_callback(cb_wrong_return_type) 2787 server_context.set_servername_callback(cb_wrong_return_type)
2125 2788
2126 with self.assertRaises(ssl.SSLError) as cm, \ 2789 with self.assertRaises(ssl.SSLError) as cm, \
2127 support.captured_stderr() as stderr: 2790 support.captured_stderr() as stderr:
2128 stats = server_params_test(client_context, server_context, 2791 stats = server_params_test(client_context, server_context,
2129 chatty=False, 2792 chatty=False,
2130 sni_name='supermessage') 2793 sni_name='supermessage')
2131 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') 2794 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2132 self.assertIn("TypeError", stderr.getvalue()) 2795 self.assertIn("TypeError", stderr.getvalue())
2796
2797 def test_read_write_after_close_raises_valuerror(self):
2798 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2799 context.verify_mode = ssl.CERT_REQUIRED
2800 context.load_verify_locations(CERTFILE)
2801 context.load_cert_chain(CERTFILE)
2802 server = ThreadedEchoServer(context=context, chatty=False)
2803
2804 with server:
2805 s = context.wrap_socket(socket.socket())
2806 s.connect((HOST, server.port))
2807 s.close()
2808
2809 self.assertRaises(ValueError, s.read, 1024)
2810 self.assertRaises(ValueError, s.write, b'hello')
2133 2811
2134 2812
2135 def test_main(verbose=False): 2813 def test_main(verbose=False):
2136 if support.verbose: 2814 if support.verbose:
2137 plats = { 2815 plats = {
2138 'Linux': platform.linux_distribution, 2816 'Linux': platform.linux_distribution,
2139 'Mac': platform.mac_ver, 2817 'Mac': platform.mac_ver,
2140 'Windows': platform.win32_ver, 2818 'Windows': platform.win32_ver,
2141 } 2819 }
2142 for name, func in plats.items(): 2820 for name, func in plats.items():
2143 plat = func() 2821 plat = func()
2144 if plat and plat[0]: 2822 if plat and plat[0]:
2145 plat = '%s %r' % (name, plat) 2823 plat = '%s %r' % (name, plat)
2146 break 2824 break
2147 else: 2825 else:
2148 plat = repr(platform.platform()) 2826 plat = repr(platform.platform())
2149 print("test_ssl: testing with %r %r" % 2827 print("test_ssl: testing with %r %r" %
2150 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) 2828 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2151 print(" under %s" % plat) 2829 print(" under %s" % plat)
2152 print(" HAS_SNI = %r" % ssl.HAS_SNI) 2830 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2831 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2832 try:
2833 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2834 except AttributeError:
2835 pass
2153 2836
2154 for filename in [ 2837 for filename in [
2155 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE, 2838 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2156 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, 2839 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2157 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, 2840 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2158 BADCERT, BADKEY, EMPTYCERT]: 2841 BADCERT, BADKEY, EMPTYCERT]:
2159 if not os.path.exists(filename): 2842 if not os.path.exists(filename):
2160 raise support.TestFailed("Can't read certificate file %r" % filename ) 2843 raise support.TestFailed("Can't read certificate file %r" % filename )
2161 2844
2162 tests = [ContextTests, BasicSocketTests, SSLErrorTests] 2845 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
2163 2846
2164 if support.is_resource_enabled('network'): 2847 if support.is_resource_enabled('network'):
2165 tests.append(NetworkedTests) 2848 tests.append(NetworkedTests)
2166 2849
2167 if _have_threads: 2850 if _have_threads:
2168 thread_info = support.threading_setup() 2851 thread_info = support.threading_setup()
2169 if thread_info: 2852 if thread_info:
2170 tests.append(ThreadedTests) 2853 tests.append(ThreadedTests)
2171 2854
2172 try: 2855 try:
2173 support.run_unittest(*tests) 2856 support.run_unittest(*tests)
2174 finally: 2857 finally:
2175 if _have_threads: 2858 if _have_threads:
2176 support.threading_cleanup(*thread_info) 2859 support.threading_cleanup(*thread_info)
2177 2860
2178 if __name__ == "__main__": 2861 if __name__ == "__main__":
2179 test_main() 2862 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+