Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(253582)

Delta Between Two Patch Sets: Lib/test/test_ssl.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 5 years, 9 months ago
Right Patch Set: Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_socket.py ('k') | Lib/test/test_strptime.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # Test the support for SSL and sockets 1 # Test the support for SSL and sockets
2 2
3 import sys 3 import sys
4 import unittest 4 import unittest
5 from test import support 5 from test import support
6 import socket 6 import socket
7 import select 7 import select
8 import time 8 import time
9 import datetime 9 import datetime
10 import gc 10 import gc
11 import os 11 import os
12 import errno 12 import errno
13 import pprint 13 import pprint
14 import tempfile 14 import tempfile
15 import urllib.request 15 import urllib.request
16 import traceback 16 import traceback
17 import asyncore 17 import asyncore
18 import weakref 18 import weakref
19 import platform 19 import platform
20 import functools 20 import functools
21 from unittest import mock 21 from unittest import mock
22 22
23 ssl = support.import_module("ssl") 23 ssl = support.import_module("ssl")
24 24
25 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) 25 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
26 HOST = support.HOST 26 HOST = support.HOST
27 27
28 data_file = lambda name: os.path.join(os.path.dirname(__file__), name) 28 def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
29 30
30 # The custom key and certificate files used in test_ssl are generated 31 # The custom key and certificate files used in test_ssl are generated
31 # using Lib/test/make_ssl_certs.py. 32 # using Lib/test/make_ssl_certs.py.
32 # Other certificates are simply fetched from the Internet servers they 33 # Other certificates are simply fetched from the Internet servers they
33 # are meant to authenticate. 34 # are meant to authenticate.
34 35
35 CERTFILE = data_file("keycert.pem") 36 CERTFILE = data_file("keycert.pem")
36 BYTES_CERTFILE = os.fsencode(CERTFILE) 37 BYTES_CERTFILE = os.fsencode(CERTFILE)
37 ONLYCERT = data_file("ssl_cert.pem") 38 ONLYCERT = data_file("ssl_cert.pem")
38 ONLYKEY = data_file("ssl_key.pem") 39 ONLYKEY = data_file("ssl_key.pem")
39 BYTES_ONLYCERT = os.fsencode(ONLYCERT) 40 BYTES_ONLYCERT = os.fsencode(ONLYCERT)
40 BYTES_ONLYKEY = os.fsencode(ONLYKEY) 41 BYTES_ONLYKEY = os.fsencode(ONLYKEY)
41 CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 42 CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
42 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 43 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
43 KEY_PASSWORD = "somepass" 44 KEY_PASSWORD = "somepass"
44 CAPATH = data_file("capath") 45 CAPATH = data_file("capath")
45 BYTES_CAPATH = os.fsencode(CAPATH) 46 BYTES_CAPATH = os.fsencode(CAPATH)
47 CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48 CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
50
51 # empty CRL
52 CRLFILE = data_file("revocation.crl")
46 53
47 # Two keys and certs signed by the same CA (for SNI tests) 54 # Two keys and certs signed by the same CA (for SNI tests)
48 SIGNED_CERTFILE = data_file("keycert3.pem") 55 SIGNED_CERTFILE = data_file("keycert3.pem")
49 SIGNED_CERTFILE2 = data_file("keycert4.pem") 56 SIGNED_CERTFILE2 = data_file("keycert4.pem")
50 SIGNING_CA = data_file("pycacert.pem") 57 SIGNING_CA = data_file("pycacert.pem")
51 58
52 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") 59 SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
53 60
54 EMPTYCERT = data_file("nullcert.pem") 61 EMPTYCERT = data_file("nullcert.pem")
55 BADCERT = data_file("badcert.pem") 62 BADCERT = data_file("badcert.pem")
(...skipping 11 matching lines...) Expand all
67 if support.verbose: 74 if support.verbose:
68 sys.stdout.write(prefix + exc_format) 75 sys.stdout.write(prefix + exc_format)
69 76
70 def can_clear_options(): 77 def can_clear_options():
71 # 0.9.8m or higher 78 # 0.9.8m or higher
72 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 79 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
73 80
74 def no_sslv2_implies_sslv3_hello(): 81 def no_sslv2_implies_sslv3_hello():
75 # 0.9.7h or higher 82 # 0.9.7h or higher
76 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
85 def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
77 88
78 def asn1time(cert_time): 89 def asn1time(cert_time):
79 # Some versions of OpenSSL ignore seconds, see #18207 90 # Some versions of OpenSSL ignore seconds, see #18207
80 # 0.9.8.i 91 # 0.9.8.i
81 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15): 92 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
82 fmt = "%b %d %H:%M:%S %Y GMT" 93 fmt = "%b %d %H:%M:%S %Y GMT"
83 dt = datetime.datetime.strptime(cert_time, fmt) 94 dt = datetime.datetime.strptime(cert_time, fmt)
84 dt = dt.replace(second=0) 95 dt = dt.replace(second=0)
85 cert_time = dt.strftime(fmt) 96 cert_time = dt.strftime(fmt)
86 # %d adds leading zero but ASN1_TIME_print() uses leading space 97 # %d adds leading zero but ASN1_TIME_print() uses leading space
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
131 "insufficient randomness")) 142 "insufficient randomness"))
132 143
133 data, is_cryptographic = ssl.RAND_pseudo_bytes(16) 144 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
134 self.assertEqual(len(data), 16) 145 self.assertEqual(len(data), 16)
135 self.assertEqual(is_cryptographic, v == 1) 146 self.assertEqual(is_cryptographic, v == 1)
136 if v: 147 if v:
137 data = ssl.RAND_bytes(16) 148 data = ssl.RAND_bytes(16)
138 self.assertEqual(len(data), 16) 149 self.assertEqual(len(data), 16)
139 else: 150 else:
140 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16) 151 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
152
153 # negative num is invalid
154 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
155 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
141 156
142 self.assertRaises(TypeError, ssl.RAND_egd, 1) 157 self.assertRaises(TypeError, ssl.RAND_egd, 1)
143 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 158 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
144 ssl.RAND_add("this is a random string", 75.0) 159 ssl.RAND_add("this is a random string", 75.0)
145 160
146 @unittest.skipUnless(os.name == 'posix', 'requires posix') 161 @unittest.skipUnless(os.name == 'posix', 'requires posix')
147 def test_random_fork(self): 162 def test_random_fork(self):
148 status = ssl.RAND_status() 163 status = ssl.RAND_status()
149 if not status: 164 if not status:
150 self.fail("OpenSSL's PRNG has insufficient randomness") 165 self.fail("OpenSSL's PRNG has insufficient randomness")
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
201 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 216 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
202 # Issue #13034: the subjectAltName in some certificates 217 # Issue #13034: the subjectAltName in some certificates
203 # (notably projects.developer.nokia.com:443) wasn't parsed 218 # (notably projects.developer.nokia.com:443) wasn't parsed
204 p = ssl._ssl._test_decode_cert(NOKIACERT) 219 p = ssl._ssl._test_decode_cert(NOKIACERT)
205 if support.verbose: 220 if support.verbose:
206 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 221 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
207 self.assertEqual(p['subjectAltName'], 222 self.assertEqual(p['subjectAltName'],
208 (('DNS', 'projects.developer.nokia.com'), 223 (('DNS', 'projects.developer.nokia.com'),
209 ('DNS', 'projects.forum.nokia.com')) 224 ('DNS', 'projects.forum.nokia.com'))
210 ) 225 )
226 # extra OCSP and AIA fields
227 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
228 self.assertEqual(p['caIssuers'],
229 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
230 self.assertEqual(p['crlDistributionPoints'],
231 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
211 232
212 def test_parse_cert_CVE_2013_4238(self): 233 def test_parse_cert_CVE_2013_4238(self):
213 p = ssl._ssl._test_decode_cert(NULLBYTECERT) 234 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
214 if support.verbose: 235 if support.verbose:
215 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 236 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
216 subject = ((('countryName', 'US'),), 237 subject = ((('countryName', 'US'),),
217 (('stateOrProvinceName', 'Oregon'),), 238 (('stateOrProvinceName', 'Oregon'),),
218 (('localityName', 'Beaverton'),), 239 (('localityName', 'Beaverton'),),
219 (('organizationName', 'Python Software Foundation'),), 240 (('organizationName', 'Python Software Foundation'),),
220 (('organizationalUnitName', 'Python Core Development'),), 241 (('organizationalUnitName', 'Python Core Development'),),
(...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after
508 self.assertEqual(len(paths), 6) 529 self.assertEqual(len(paths), 6)
509 self.assertIsInstance(paths, ssl.DefaultVerifyPaths) 530 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
510 531
511 with support.EnvironmentVarGuard() as env: 532 with support.EnvironmentVarGuard() as env:
512 env["SSL_CERT_DIR"] = CAPATH 533 env["SSL_CERT_DIR"] = CAPATH
513 env["SSL_CERT_FILE"] = CERTFILE 534 env["SSL_CERT_FILE"] = CERTFILE
514 paths = ssl.get_default_verify_paths() 535 paths = ssl.get_default_verify_paths()
515 self.assertEqual(paths.cafile, CERTFILE) 536 self.assertEqual(paths.cafile, CERTFILE)
516 self.assertEqual(paths.capath, CAPATH) 537 self.assertEqual(paths.capath, CAPATH)
517 538
518
519 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 539 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
520 def test_enum_cert_store(self): 540 def test_enum_certificates(self):
521 self.assertEqual(ssl.X509_ASN_ENCODING, 1) 541 self.assertTrue(ssl.enum_certificates("CA"))
522 self.assertEqual(ssl.PKCS_7_ASN_ENCODING, 0x00010000) 542 self.assertTrue(ssl.enum_certificates("ROOT"))
523 543
524 self.assertEqual(ssl.enum_cert_store("CA"), 544 self.assertRaises(TypeError, ssl.enum_certificates)
525 ssl.enum_cert_store("CA", "certificate")) 545 self.assertRaises(WindowsError, ssl.enum_certificates, "")
526 ssl.enum_cert_store("CA", "crl") 546
527 self.assertEqual(ssl.enum_cert_store("ROOT"), 547 trust_oids = set()
528 ssl.enum_cert_store("ROOT", "certificate")) 548 for storename in ("CA", "ROOT"):
529 ssl.enum_cert_store("ROOT", "crl") 549 store = ssl.enum_certificates(storename)
530 550 self.assertIsInstance(store, list)
531 self.assertRaises(TypeError, ssl.enum_cert_store) 551 for element in store:
532 self.assertRaises(WindowsError, ssl.enum_cert_store, "") 552 self.assertIsInstance(element, tuple)
533 self.assertRaises(ValueError, ssl.enum_cert_store, "CA", "wrong") 553 self.assertEqual(len(element), 3)
534 554 cert, enc, trust = element
535 ca = ssl.enum_cert_store("CA") 555 self.assertIsInstance(cert, bytes)
536 self.assertIsInstance(ca, list) 556 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
537 self.assertIsInstance(ca[0], tuple) 557 self.assertIsInstance(trust, (set, bool))
538 self.assertEqual(len(ca[0]), 2) 558 if isinstance(trust, set):
539 self.assertIsInstance(ca[0][0], bytes) 559 trust_oids.update(trust)
540 self.assertIsInstance(ca[0][1], int) 560
561 serverAuth = "1.3.6.1.5.5.7.3.1"
562 self.assertIn(serverAuth, trust_oids)
563
564 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
565 def test_enum_crls(self):
566 self.assertTrue(ssl.enum_crls("CA"))
567 self.assertRaises(TypeError, ssl.enum_crls)
568 self.assertRaises(WindowsError, ssl.enum_crls, "")
569
570 crls = ssl.enum_crls("CA")
571 self.assertIsInstance(crls, list)
572 for element in crls:
573 self.assertIsInstance(element, tuple)
574 self.assertEqual(len(element), 2)
575 self.assertIsInstance(element[0], bytes)
576 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
577
578
579 def test_asn1object(self):
580 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
581 '1.3.6.1.5.5.7.3.1')
582
583 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
584 self.assertEqual(val, expected)
585 self.assertEqual(val.nid, 129)
586 self.assertEqual(val.shortname, 'serverAuth')
587 self.assertEqual(val.longname, 'TLS Web Server Authentication')
588 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
589 self.assertIsInstance(val, ssl._ASN1Object)
590 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
591
592 val = ssl._ASN1Object.fromnid(129)
593 self.assertEqual(val, expected)
594 self.assertIsInstance(val, ssl._ASN1Object)
595 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
596 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
597 ssl._ASN1Object.fromnid(100000)
598 for i in range(1000):
599 try:
600 obj = ssl._ASN1Object.fromnid(i)
601 except ValueError:
602 pass
603 else:
604 self.assertIsInstance(obj.nid, int)
605 self.assertIsInstance(obj.shortname, str)
606 self.assertIsInstance(obj.longname, str)
607 self.assertIsInstance(obj.oid, (str, type(None)))
608
609 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
610 self.assertEqual(val, expected)
611 self.assertIsInstance(val, ssl._ASN1Object)
612 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
613 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
614 expected)
615 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
616 ssl._ASN1Object.fromname('serverauth')
617
618 def test_purpose_enum(self):
619 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
620 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
621 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
622 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
623 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
624 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
625 '1.3.6.1.5.5.7.3.1')
626
627 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
628 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
629 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
630 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
631 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
632 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
633 '1.3.6.1.5.5.7.3.2')
634
635 def test_unsupported_dtls(self):
636 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
637 self.addCleanup(s.close)
638 with self.assertRaises(NotImplementedError) as cx:
639 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
640 self.assertEqual(str(cx.exception), "only stream sockets are supported")
641 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
642 with self.assertRaises(NotImplementedError) as cx:
643 ctx.wrap_socket(s)
644 self.assertEqual(str(cx.exception), "only stream sockets are supported")
645
541 646
542 class ContextTests(unittest.TestCase): 647 class ContextTests(unittest.TestCase):
543 648
544 @skip_if_broken_ubuntu_ssl 649 @skip_if_broken_ubuntu_ssl
545 def test_constructor(self): 650 def test_constructor(self):
546 for protocol in PROTOCOLS: 651 for protocol in PROTOCOLS:
547 ssl.SSLContext(protocol) 652 ssl.SSLContext(protocol)
548 self.assertRaises(TypeError, ssl.SSLContext) 653 self.assertRaises(TypeError, ssl.SSLContext)
549 self.assertRaises(ValueError, ssl.SSLContext, -1) 654 self.assertRaises(ValueError, ssl.SSLContext, -1)
550 self.assertRaises(ValueError, ssl.SSLContext, 42) 655 self.assertRaises(ValueError, ssl.SSLContext, 42)
551 656
552 @skip_if_broken_ubuntu_ssl 657 @skip_if_broken_ubuntu_ssl
553 def test_protocol(self): 658 def test_protocol(self):
554 for proto in PROTOCOLS: 659 for proto in PROTOCOLS:
555 ctx = ssl.SSLContext(proto) 660 ctx = ssl.SSLContext(proto)
556 self.assertEqual(ctx.protocol, proto) 661 self.assertEqual(ctx.protocol, proto)
557 662
558 def test_ciphers(self): 663 def test_ciphers(self):
559 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 664 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
560 ctx.set_ciphers("ALL") 665 ctx.set_ciphers("ALL")
561 ctx.set_ciphers("DEFAULT") 666 ctx.set_ciphers("DEFAULT")
562 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): 667 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
563 ctx.set_ciphers("^$:,;?*'dorothyx") 668 ctx.set_ciphers("^$:,;?*'dorothyx")
564 669
565 @skip_if_broken_ubuntu_ssl 670 @skip_if_broken_ubuntu_ssl
566 def test_options(self): 671 def test_options(self):
567 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 672 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
568 # OP_ALL is the default value 673 # OP_ALL | OP_NO_SSLv2 is the default value
569 self.assertEqual(ssl.OP_ALL, ctx.options)
570 ctx.options |= ssl.OP_NO_SSLv2
571 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2, 674 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
572 ctx.options) 675 ctx.options)
573 ctx.options |= ssl.OP_NO_SSLv3 676 ctx.options |= ssl.OP_NO_SSLv3
574 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3, 677 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
575 ctx.options) 678 ctx.options)
576 if can_clear_options(): 679 if can_clear_options():
577 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1 680 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
578 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3, 681 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
579 ctx.options) 682 ctx.options)
580 ctx.options = 0 683 ctx.options = 0
581 self.assertEqual(0, ctx.options) 684 self.assertEqual(0, ctx.options)
582 else: 685 else:
583 with self.assertRaises(ValueError): 686 with self.assertRaises(ValueError):
584 ctx.options = 0 687 ctx.options = 0
585 688
586 def test_verify(self): 689 def test_verify_mode(self):
587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 690 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
588 # Default value 691 # Default value
589 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 692 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
590 ctx.verify_mode = ssl.CERT_OPTIONAL 693 ctx.verify_mode = ssl.CERT_OPTIONAL
591 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 694 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
592 ctx.verify_mode = ssl.CERT_REQUIRED 695 ctx.verify_mode = ssl.CERT_REQUIRED
593 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 696 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
594 ctx.verify_mode = ssl.CERT_NONE 697 ctx.verify_mode = ssl.CERT_NONE
595 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 698 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
596 with self.assertRaises(TypeError): 699 with self.assertRaises(TypeError):
597 ctx.verify_mode = None 700 ctx.verify_mode = None
598 with self.assertRaises(ValueError): 701 with self.assertRaises(ValueError):
599 ctx.verify_mode = 42 702 ctx.verify_mode = 42
703
704 @unittest.skipUnless(have_verify_flags(),
705 "verify_flags need OpenSSL > 0.9.8")
706 def test_verify_flags(self):
707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
708 # default value by OpenSSL
709 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
710 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
711 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
712 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
713 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
714 ctx.verify_flags = ssl.VERIFY_DEFAULT
715 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
716 # supports any value
717 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
718 self.assertEqual(ctx.verify_flags,
719 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
720 with self.assertRaises(TypeError):
721 ctx.verify_flags = None
600 722
601 def test_load_cert_chain(self): 723 def test_load_cert_chain(self):
602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 724 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
603 # Combined key and cert in a single file 725 # Combined key and cert in a single file
604 ctx.load_cert_chain(CERTFILE) 726 ctx.load_cert_chain(CERTFILE)
605 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 727 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
606 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 728 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
607 with self.assertRaises(OSError) as cm: 729 with self.assertRaises(OSError) as cm:
608 ctx.load_cert_chain(WRONGCERT) 730 ctx.load_cert_chain(WRONGCERT)
609 self.assertEqual(cm.exception.errno, errno.ENOENT) 731 self.assertEqual(cm.exception.errno, errno.ENOENT)
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 # Make sure the password function isn't called if it isn't needed 803 # Make sure the password function isn't called if it isn't needed
682 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 804 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
683 805
684 def test_load_verify_locations(self): 806 def test_load_verify_locations(self):
685 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 807 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
686 ctx.load_verify_locations(CERTFILE) 808 ctx.load_verify_locations(CERTFILE)
687 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 809 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
688 ctx.load_verify_locations(BYTES_CERTFILE) 810 ctx.load_verify_locations(BYTES_CERTFILE)
689 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 811 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
690 self.assertRaises(TypeError, ctx.load_verify_locations) 812 self.assertRaises(TypeError, ctx.load_verify_locations)
691 self.assertRaises(TypeError, ctx.load_verify_locations, None, None) 813 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None )
692 with self.assertRaises(OSError) as cm: 814 with self.assertRaises(OSError) as cm:
693 ctx.load_verify_locations(WRONGCERT) 815 ctx.load_verify_locations(WRONGCERT)
694 self.assertEqual(cm.exception.errno, errno.ENOENT) 816 self.assertEqual(cm.exception.errno, errno.ENOENT)
695 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): 817 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
696 ctx.load_verify_locations(BADCERT) 818 ctx.load_verify_locations(BADCERT)
697 ctx.load_verify_locations(CERTFILE, CAPATH) 819 ctx.load_verify_locations(CERTFILE, CAPATH)
698 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 820 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
699 821
700 # Issue #10989: crash if the second argument type is invalid 822 # Issue #10989: crash if the second argument type is invalid
701 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 823 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
824
825 def test_load_verify_cadata(self):
826 # test cadata
827 with open(CAFILE_CACERT) as f:
828 cacert_pem = f.read()
829 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
830 with open(CAFILE_NEURONIO) as f:
831 neuronio_pem = f.read()
832 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
833
834 # test PEM
835 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
836 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
837 ctx.load_verify_locations(cadata=cacert_pem)
838 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
839 ctx.load_verify_locations(cadata=neuronio_pem)
840 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
841 # cert already in hash table
842 ctx.load_verify_locations(cadata=neuronio_pem)
843 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
844
845 # combined
846 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
847 combined = "\n".join((cacert_pem, neuronio_pem))
848 ctx.load_verify_locations(cadata=combined)
849 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
850
851 # with junk around the certs
852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
853 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
854 neuronio_pem, "tail"]
855 ctx.load_verify_locations(cadata="\n".join(combined))
856 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
857
858 # test DER
859 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
860 ctx.load_verify_locations(cadata=cacert_der)
861 ctx.load_verify_locations(cadata=neuronio_der)
862 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
863 # cert already in hash table
864 ctx.load_verify_locations(cadata=cacert_der)
865 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
866
867 # combined
868 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
869 combined = b"".join((cacert_der, neuronio_der))
870 ctx.load_verify_locations(cadata=combined)
871 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
872
873 # error cases
874 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
875 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
876
877 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
878 ctx.load_verify_locations(cadata="broken")
879 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
880 ctx.load_verify_locations(cadata=b"broken")
881
702 882
703 def test_load_dh_params(self): 883 def test_load_dh_params(self):
704 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
705 ctx.load_dh_params(DHFILE) 885 ctx.load_dh_params(DHFILE)
706 if os.name != 'nt': 886 if os.name != 'nt':
707 ctx.load_dh_params(BYTES_DHFILE) 887 ctx.load_dh_params(BYTES_DHFILE)
708 self.assertRaises(TypeError, ctx.load_dh_params) 888 self.assertRaises(TypeError, ctx.load_dh_params)
709 self.assertRaises(TypeError, ctx.load_dh_params, None) 889 self.assertRaises(TypeError, ctx.load_dh_params, None)
710 with self.assertRaises(FileNotFoundError) as cm: 890 with self.assertRaises(FileNotFoundError) as cm:
711 ctx.load_dh_params(WRONGCERT) 891 ctx.load_dh_params(WRONGCERT)
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
798 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert 978 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
799 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) 979 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
800 self.assertEqual(ctx.get_ca_certs(), 980 self.assertEqual(ctx.get_ca_certs(),
801 [{'issuer': ((('organizationName', 'Root CA'),), 981 [{'issuer': ((('organizationName', 'Root CA'),),
802 (('organizationalUnitName', 'http://www.cacert.org'),), 982 (('organizationalUnitName', 'http://www.cacert.org'),),
803 (('commonName', 'CA Cert Signing Authority'),), 983 (('commonName', 'CA Cert Signing Authority'),),
804 (('emailAddress', 'support@cacert.org'),)), 984 (('emailAddress', 'support@cacert.org'),)),
805 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'), 985 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
806 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'), 986 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
807 'serialNumber': '00', 987 'serialNumber': '00',
988 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
808 'subject': ((('organizationName', 'Root CA'),), 989 'subject': ((('organizationName', 'Root CA'),),
809 (('organizationalUnitName', 'http://www.cacert.org'),) , 990 (('organizationalUnitName', 'http://www.cacert.org'),) ,
810 (('commonName', 'CA Cert Signing Authority'),), 991 (('commonName', 'CA Cert Signing Authority'),),
811 (('emailAddress', 'support@cacert.org'),)), 992 (('emailAddress', 'support@cacert.org'),)),
812 'version': 3}]) 993 'version': 3}])
813 994
814 with open(SVN_PYTHON_ORG_ROOT_CERT) as f: 995 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
815 pem = f.read() 996 pem = f.read()
816 der = ssl.PEM_cert_to_DER_cert(pem) 997 der = ssl.PEM_cert_to_DER_cert(pem)
817 self.assertEqual(ctx.get_ca_certs(True), [der]) 998 self.assertEqual(ctx.get_ca_certs(True), [der])
999
1000 def test_load_default_certs(self):
1001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1002 ctx.load_default_certs()
1003
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1005 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1006 ctx.load_default_certs()
1007
1008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1009 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1010
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 self.assertRaises(TypeError, ctx.load_default_certs, None)
1013 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1014
1015 def test_create_default_context(self):
1016 ctx = ssl.create_default_context()
1017 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1019 self.assertTrue(ctx.check_hostname)
1020 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1021
1022 with open(SIGNING_CA) as f:
1023 cadata = f.read()
1024 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1025 cadata=cadata)
1026 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1028 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1029
1030 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1031 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1033 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1034
1035 def test__create_stdlib_context(self):
1036 ctx = ssl._create_stdlib_context()
1037 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1038 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1039 self.assertFalse(ctx.check_hostname)
1040 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1041
1042 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1043 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1045 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1046
1047 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1048 cert_reqs=ssl.CERT_REQUIRED,
1049 check_hostname=True)
1050 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 self.assertTrue(ctx.check_hostname)
1053 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1054
1055 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1056 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1057 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1058 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1059
1060 def test_check_hostname(self):
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1062 self.assertFalse(ctx.check_hostname)
1063
1064 # Requires CERT_REQUIRED or CERT_OPTIONAL
1065 with self.assertRaises(ValueError):
1066 ctx.check_hostname = True
1067 ctx.verify_mode = ssl.CERT_REQUIRED
1068 self.assertFalse(ctx.check_hostname)
1069 ctx.check_hostname = True
1070 self.assertTrue(ctx.check_hostname)
1071
1072 ctx.verify_mode = ssl.CERT_OPTIONAL
1073 ctx.check_hostname = True
1074 self.assertTrue(ctx.check_hostname)
1075
1076 # Cannot set CERT_NONE with check_hostname enabled
1077 with self.assertRaises(ValueError):
1078 ctx.verify_mode = ssl.CERT_NONE
1079 ctx.check_hostname = False
1080 self.assertFalse(ctx.check_hostname)
818 1081
819 1082
820 class SSLErrorTests(unittest.TestCase): 1083 class SSLErrorTests(unittest.TestCase):
821 1084
822 def test_str(self): 1085 def test_str(self):
823 # The str() of a SSLError doesn't include the errno 1086 # The str() of a SSLError doesn't include the errno
824 e = ssl.SSLError(1, "foo") 1087 e = ssl.SSLError(1, "foo")
825 self.assertEqual(str(e), "foo") 1088 self.assertEqual(str(e), "foo")
826 self.assertEqual(e.errno, 1) 1089 self.assertEqual(e.errno, 1)
827 # Same for a subclass 1090 # Same for a subclass
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
944 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1207 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
945 finally: 1208 finally:
946 s.close() 1209 s.close()
947 1210
948 def test_connect_ex_error(self): 1211 def test_connect_ex_error(self):
949 with support.transient_internet("svn.python.org"): 1212 with support.transient_internet("svn.python.org"):
950 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1213 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
951 cert_reqs=ssl.CERT_REQUIRED, 1214 cert_reqs=ssl.CERT_REQUIRED,
952 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 1215 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
953 try: 1216 try:
954 self.assertEqual(errno.ECONNREFUSED, 1217 rc = s.connect_ex(("svn.python.org", 444))
955 s.connect_ex(("svn.python.org", 444))) 1218 # Issue #19919: Windows machines or VMs hosted on Windows
1219 # machines sometimes return EWOULDBLOCK.
1220 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
956 finally: 1221 finally:
957 s.close() 1222 s.close()
958 1223
959 def test_connect_with_context(self): 1224 def test_connect_with_context(self):
960 with support.transient_internet("svn.python.org"): 1225 with support.transient_internet("svn.python.org"):
961 # Same as test_connect, but with a separately created context 1226 # Same as test_connect, but with a separately created context
962 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1227 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
963 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1228 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
964 s.connect(("svn.python.org", 443)) 1229 s.connect(("svn.python.org", 443))
965 try: 1230 try:
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1012 ctx.verify_mode = ssl.CERT_REQUIRED 1277 ctx.verify_mode = ssl.CERT_REQUIRED
1013 ctx.load_verify_locations(capath=BYTES_CAPATH) 1278 ctx.load_verify_locations(capath=BYTES_CAPATH)
1014 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1279 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1015 s.connect(("svn.python.org", 443)) 1280 s.connect(("svn.python.org", 443))
1016 try: 1281 try:
1017 cert = s.getpeercert() 1282 cert = s.getpeercert()
1018 self.assertTrue(cert) 1283 self.assertTrue(cert)
1019 finally: 1284 finally:
1020 s.close() 1285 s.close()
1021 1286
1287 def test_connect_cadata(self):
1288 with open(CAFILE_CACERT) as f:
1289 pem = f.read()
1290 der = ssl.PEM_cert_to_DER_cert(pem)
1291 with support.transient_internet("svn.python.org"):
1292 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1293 ctx.verify_mode = ssl.CERT_REQUIRED
1294 ctx.load_verify_locations(cadata=pem)
1295 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1296 s.connect(("svn.python.org", 443))
1297 cert = s.getpeercert()
1298 self.assertTrue(cert)
1299
1300 # same with DER
1301 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1302 ctx.verify_mode = ssl.CERT_REQUIRED
1303 ctx.load_verify_locations(cadata=der)
1304 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1305 s.connect(("svn.python.org", 443))
1306 cert = s.getpeercert()
1307 self.assertTrue(cert)
1308
1022 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s") 1309 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Window s")
1023 def test_makefile_close(self): 1310 def test_makefile_close(self):
1024 # Issue #5238: creating a file-like object with makefile() shouldn't 1311 # Issue #5238: creating a file-like object with makefile() shouldn't
1025 # delay closing the underlying "real socket" (here tested with its 1312 # delay closing the underlying "real socket" (here tested with its
1026 # file descriptor, hence skipping the test under Windows). 1313 # file descriptor, hence skipping the test under Windows).
1027 with support.transient_internet("svn.python.org"): 1314 with support.transient_internet("svn.python.org"):
1028 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1315 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1029 ss.connect(("svn.python.org", 443)) 1316 ss.connect(("svn.python.org", 443))
1030 fd = ss.fileno() 1317 fd = ss.fileno()
1031 f = ss.makefile() 1318 f = ss.makefile()
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
1140 self.assertEqual(ctx.get_ca_certs(), []) 1427 self.assertEqual(ctx.get_ca_certs(), [])
1141 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1428 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1142 s.connect(("svn.python.org", 443)) 1429 s.connect(("svn.python.org", 443))
1143 try: 1430 try:
1144 cert = s.getpeercert() 1431 cert = s.getpeercert()
1145 self.assertTrue(cert) 1432 self.assertTrue(cert)
1146 finally: 1433 finally:
1147 s.close() 1434 s.close()
1148 self.assertEqual(len(ctx.get_ca_certs()), 1) 1435 self.assertEqual(len(ctx.get_ca_certs()), 1)
1149 1436
1437 @needs_sni
1438 def test_context_setget(self):
1439 # Check that the context of a connected socket can be replaced.
1440 with support.transient_internet("svn.python.org"):
1441 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1442 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1443 s = socket.socket(socket.AF_INET)
1444 with ctx1.wrap_socket(s) as ss:
1445 ss.connect(("svn.python.org", 443))
1446 self.assertIs(ss.context, ctx1)
1447 self.assertIs(ss._sslobj.context, ctx1)
1448 ss.context = ctx2
1449 self.assertIs(ss.context, ctx2)
1450 self.assertIs(ss._sslobj.context, ctx2)
1150 1451
1151 try: 1452 try:
1152 import threading 1453 import threading
1153 except ImportError: 1454 except ImportError:
1154 _have_threads = False 1455 _have_threads = False
1155 else: 1456 else:
1156 _have_threads = True 1457 _have_threads = True
1157 1458
1158 from test.ssl_servers import make_https_server 1459 from test.ssl_servers import make_https_server
1159 1460
(...skipping 391 matching lines...) Expand 10 before | Expand all | Expand 10 after
1551 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 1852 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1552 ssl.CERT_REQUIRED: "CERT_REQUIRED", 1853 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1553 }[certsreqs] 1854 }[certsreqs]
1554 if support.verbose: 1855 if support.verbose:
1555 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 1856 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
1556 sys.stdout.write(formatstr % 1857 sys.stdout.write(formatstr %
1557 (ssl.get_protocol_name(client_protocol), 1858 (ssl.get_protocol_name(client_protocol),
1558 ssl.get_protocol_name(server_protocol), 1859 ssl.get_protocol_name(server_protocol),
1559 certtype)) 1860 certtype))
1560 client_context = ssl.SSLContext(client_protocol) 1861 client_context = ssl.SSLContext(client_protocol)
1561 client_context.options = ssl.OP_ALL | client_options 1862 client_context.options |= client_options
1562 server_context = ssl.SSLContext(server_protocol) 1863 server_context = ssl.SSLContext(server_protocol)
1563 server_context.options = ssl.OP_ALL | server_options 1864 server_context.options |= server_options
1564 1865
1565 # NOTE: we must enable "ALL" ciphers on the client, otherwise an 1866 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1566 # SSLv23 client will send an SSLv3 hello (rather than SSLv2) 1867 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1567 # starting from OpenSSL 1.0.0 (see issue #8322). 1868 # starting from OpenSSL 1.0.0 (see issue #8322).
1568 if client_context.protocol == ssl.PROTOCOL_SSLv23: 1869 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1569 client_context.set_ciphers("ALL") 1870 client_context.set_ciphers("ALL")
1570 1871
1571 for ctx in (client_context, server_context): 1872 for ctx in (client_context, server_context):
1572 ctx.verify_mode = certsreqs 1873 ctx.verify_mode = certsreqs
1573 ctx.load_cert_chain(CERTFILE) 1874 ctx.load_cert_chain(CERTFILE)
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
1636 self.fail( 1937 self.fail(
1637 "Missing or invalid 'organizationName' field in certific ate subject; " 1938 "Missing or invalid 'organizationName' field in certific ate subject; "
1638 "should be 'Python Software Foundation'.") 1939 "should be 'Python Software Foundation'.")
1639 self.assertIn('notBefore', cert) 1940 self.assertIn('notBefore', cert)
1640 self.assertIn('notAfter', cert) 1941 self.assertIn('notAfter', cert)
1641 before = ssl.cert_time_to_seconds(cert['notBefore']) 1942 before = ssl.cert_time_to_seconds(cert['notBefore'])
1642 after = ssl.cert_time_to_seconds(cert['notAfter']) 1943 after = ssl.cert_time_to_seconds(cert['notAfter'])
1643 self.assertLess(before, after) 1944 self.assertLess(before, after)
1644 s.close() 1945 s.close()
1645 1946
1947 @unittest.skipUnless(have_verify_flags(),
1948 "verify_flags need OpenSSL > 0.9.8")
1949 def test_crl_check(self):
1950 if support.verbose:
1951 sys.stdout.write("\n")
1952
1953 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1954 server_context.load_cert_chain(SIGNED_CERTFILE)
1955
1956 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1957 context.verify_mode = ssl.CERT_REQUIRED
1958 context.load_verify_locations(SIGNING_CA)
1959 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
1960
1961 # VERIFY_DEFAULT should pass
1962 server = ThreadedEchoServer(context=server_context, chatty=True)
1963 with server:
1964 with context.wrap_socket(socket.socket()) as s:
1965 s.connect((HOST, server.port))
1966 cert = s.getpeercert()
1967 self.assertTrue(cert, "Can't get peer certificate.")
1968
1969 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
1970 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
1971
1972 server = ThreadedEchoServer(context=server_context, chatty=True)
1973 with server:
1974 with context.wrap_socket(socket.socket()) as s:
1975 with self.assertRaisesRegex(ssl.SSLError,
1976 "certificate verify failed"):
1977 s.connect((HOST, server.port))
1978
1979 # now load a CRL file. The CRL file is signed by the CA.
1980 context.load_verify_locations(CRLFILE)
1981
1982 server = ThreadedEchoServer(context=server_context, chatty=True)
1983 with server:
1984 with context.wrap_socket(socket.socket()) as s:
1985 s.connect((HOST, server.port))
1986 cert = s.getpeercert()
1987 self.assertTrue(cert, "Can't get peer certificate.")
1988
1989 @needs_sni
1990 def test_check_hostname(self):
1991 if support.verbose:
1992 sys.stdout.write("\n")
1993
1994 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1995 server_context.load_cert_chain(SIGNED_CERTFILE)
1996
1997 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1998 context.verify_mode = ssl.CERT_REQUIRED
1999 context.check_hostname = True
2000 context.load_verify_locations(SIGNING_CA)
2001
2002 # correct hostname should verify
2003 server = ThreadedEchoServer(context=server_context, chatty=True)
2004 with server:
2005 with context.wrap_socket(socket.socket(),
2006 server_hostname="localhost") as s:
2007 s.connect((HOST, server.port))
2008 cert = s.getpeercert()
2009 self.assertTrue(cert, "Can't get peer certificate.")
2010
2011 # incorrect hostname should raise an exception
2012 server = ThreadedEchoServer(context=server_context, chatty=True)
2013 with server:
2014 with context.wrap_socket(socket.socket(),
2015 server_hostname="invalid") as s:
2016 with self.assertRaisesRegex(ssl.CertificateError,
2017 "hostname 'invalid' doesn't matc h 'localhost'"):
2018 s.connect((HOST, server.port))
2019
2020 # missing server_hostname arg should cause an exception, too
2021 server = ThreadedEchoServer(context=server_context, chatty=True)
2022 with server:
2023 with socket.socket() as s:
2024 with self.assertRaisesRegex(ValueError,
2025 "check_hostname requires server_ hostname"):
2026 context.wrap_socket(s)
2027
1646 def test_empty_cert(self): 2028 def test_empty_cert(self):
1647 """Connecting with an empty cert file""" 2029 """Connecting with an empty cert file"""
1648 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2030 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1649 "nullcert.pem")) 2031 "nullcert.pem"))
1650 def test_malformed_cert(self): 2032 def test_malformed_cert(self):
1651 """Connecting with a badly formatted certificate (syntax error)""" 2033 """Connecting with a badly formatted certificate (syntax error)"""
1652 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 2034 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1653 "badcert.pem")) 2035 "badcert.pem"))
1654 def test_nonexisting_cert(self): 2036 def test_nonexisting_cert(self):
1655 """Connecting with a non-existing cert file""" 2037 """Connecting with a non-existing cert file"""
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
1704 @skip_if_broken_ubuntu_ssl 2086 @skip_if_broken_ubuntu_ssl
1705 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2087 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
1706 "OpenSSL is compiled without SSLv2 support") 2088 "OpenSSL is compiled without SSLv2 support")
1707 def test_protocol_sslv2(self): 2089 def test_protocol_sslv2(self):
1708 """Connecting to an SSLv2 server with various client options""" 2090 """Connecting to an SSLv2 server with various client options"""
1709 if support.verbose: 2091 if support.verbose:
1710 sys.stdout.write("\n") 2092 sys.stdout.write("\n")
1711 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2093 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1712 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL) 2094 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_OPTIONAL)
1713 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED) 2095 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl .CERT_REQUIRED)
1714 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 2096 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
1715 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2097 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
1716 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2098 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
1717 # SSLv23 client with specific SSL options 2099 # SSLv23 client with specific SSL options
1718 if no_sslv2_implies_sslv3_hello(): 2100 if no_sslv2_implies_sslv3_hello():
1719 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2101 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
1720 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e, 2102 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, Fals e,
1721 client_options=ssl.OP_NO_SSLv2) 2103 client_options=ssl.OP_NO_SSLv2)
1722 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2104 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1723 client_options=ssl.OP_NO_SSLv3) 2105 client_options=ssl.OP_NO_SSLv3)
1724 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, 2106 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
1725 client_options=ssl.OP_NO_TLSv1) 2107 client_options=ssl.OP_NO_TLSv1)
1726 2108
1727 @skip_if_broken_ubuntu_ssl 2109 @skip_if_broken_ubuntu_ssl
1728 def test_protocol_sslv23(self): 2110 def test_protocol_sslv23(self):
1729 """Connecting to an SSLv23 server with various client options""" 2111 """Connecting to an SSLv23 server with various client options"""
1730 if support.verbose: 2112 if support.verbose:
1731 sys.stdout.write("\n") 2113 sys.stdout.write("\n")
1732 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2114 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1733 try: 2115 try:
1734 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2116 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
(...skipping 736 matching lines...) Expand 10 before | Expand all | Expand 10 after
2471 tests.append(ThreadedTests) 2853 tests.append(ThreadedTests)
2472 2854
2473 try: 2855 try:
2474 support.run_unittest(*tests) 2856 support.run_unittest(*tests)
2475 finally: 2857 finally:
2476 if _have_threads: 2858 if _have_threads:
2477 support.threading_cleanup(*thread_info) 2859 support.threading_cleanup(*thread_info)
2478 2860
2479 if __name__ == "__main__": 2861 if __name__ == "__main__":
2480 test_main() 2862 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+