# Option #1: SSLMemorySocket # # This option creates a new class callled SSLMemorySocket. It is modeled # similar to SSLSocket and exports a subset of its methods. # # This has a read() and write() method, but not any of the socket specific # methods like recv(), send(), shutdown(), etc. # # This does not support clear text passthrough slightly complicating # implemeting STARTTLS. class SSLMemorySocket(object): def __init__(self, incoming=None, outgoing=None, keyfile=None, certfile=None, ..., _context=None): self._incoming = incoming or MemoryBIO() self._outgoing = outgoing or MemoryBIO() if _context: self._context = _context else: self._context = SSLContext(ssl_version) # set various other bits self._sslobj = _context._wrap_bio(self._incoming, self._outgoing, ...) @property def incoming(self): return self._incoming @property def outgoing(self): return self._outgoing @property def context(self): return self._context def read(self, n): return self._sslobj.read(n) def write(self, buf): return self._sslobj.write(buf) def do_handshake(self): return self._sslobj.do_handshake() def unwrap(self): self._sslobj.shutdown() def getpeercert(self): return self._sslobj.peer_certificate() # Also add cipher() / compression() / selected_npn_protocol() class SSLContext(_SSLContext): def wrap_bio(self, incoming, outgoing, ...): return SSLMemorySocket(incoming, outgoing, ..., _context=self) def wrap_bio(incoming, outgoing, ...): return SSLMemorySocket(incoming, outgoing, ..., _context=None) # Option #2: Like option #1 but support clear-text passthrough class SSLMemorySocket(object): def __init__(self, keyfile=None, ..., do_handshake_on_connect=True, _context=None): # Like option #1, but: if do_handshake_on_connect: self._sslobj = self._context._wrap_bio(self._incoming, self._outgoing) else: self._ssobj = None def read(self, n): if self._sslobj: return self._sslobj.read(n) buf = self._incoming.read() if not buf and not self._incoming.eof: raise BlockingIOError() return buf def write(self, buf): if self._sslobj: return self._sslobj.write(buf) return self._outgoing.write(buf) def shutdown(self): if self._sslobj is None: return self._sslobj.shutdown() self._sslobj = None # Other methods as in option #1 but each makes a None check for _sslobj. # SSLContext.wrap_bio and wrap_bio as in option #1 # Option #3: Light weight SSLObject that is like Option #1 but does less. # Mostly just renames methods from _SSLSocket to the SSLSocket API. # It lacks the functionality to manage a context, and is agnostic to the BIO # being used. Could in theory be used with the socket or any other BIO. class SSLObject(object): """This object has no public constructor.""" def __init__(self, sslobj): self._sslobj = sslobj @property def context(self): return self._sslobj.context def read(self, n): return self._sslobj.read(n) def write(self, buf): return self._sslobj.write(buf) def do_handshake(self): return self._sslobj.do_handshake() def unwrap(self): self._sslobj.shutdown() def getpeercert(self): return self._sslobj.peer_certificate() # Also add cipher() / compression() / selected_npn_protocol() class SSLContext(_SSLContext): def wrap_bio(self, incoming, outgoing, ...): sslobj = self._wrap_bio(incoming, outgoing, ...) return SSLObject(sslobj) # no wrap_bio()