import cStringIO # Mock socket module _defaulttimeout = None _reply_data = None def reply_with(data): global _reply_data _reply_data = data class MockSocket(object): timeout = None def settimeout(self, timeout): if timeout is None: self.timeout = _defaulttimeout else: self.timeout = timeout def gettimeout(self): return self.timeout def setsockopt(self, level, optname, value): pass def bind(self, address): pass def getsockname(self): return ('0.0.0.0', 0) def listen(self, backlog): pass def makefile(self, mode='r', bufsize=-1): handle = cStringIO.StringIO(_reply_data) return handle def sendall(self, buffer): pass def close(self): pass def socket(family=None, type=None, proto=None): return MockSocket() def create_connection(address, timeout=None): try: int_port = int(address[1]) except ValueError: raise error ms = MockSocket() ms.settimeout(timeout) return ms def setdefaulttimeout(timeout): global _defaulttimeout _defaulttimeout = timeout def getdefaulttimeout(): return _defaulttimeout def getfqdn(): return "" def gethostname(): pass def gethostbyname(name): return "" class gaierror(Exception): pass class error(Exception): pass # Constants AF_INET = None SOCK_STREAM = None SOL_SOCKET = None SO_REUSEADDR = None