import tempfile import os import shutil from io import BytesIO import os import json import re import socket from threading import Thread try: # py3k from urllib.request import urlopen from http.server import BaseHTTPRequestHandler, HTTPServer except ImportError: # py2.7 from urllib2 import urlopen from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer FILE_CONTENT = 'x' * 100000 class MockServerRequestHandler(BaseHTTPRequestHandler): '''local http server serving chunked file on GET''' USERS_PATTERN = re.compile(r'/downloads/.*') CHUNK_SIZE = 1024 def do_GET(self): if re.search(self.USERS_PATTERN, self.path): # Add response status code. self.send_response(200) # Add response headers. self.send_header('Content-Type', 'application/text;') self.send_header('Transfer-Encoding', 'chunked') self.send_header('Connection', 'close') self.end_headers() # Add response content. stream = BytesIO(FILE_CONTENT.encode('utf-8')) # self.wfile.write(FILE_CONTENT) # nonchunked while True: data = stream.read(self.CHUNK_SIZE) self.wfile.write(b"%X\r\n%s\r\n" % (len(data), data)) # If there's no more data to read, stop streaming if not data: break # Ensure any buffered output has been transmitted and close the stream self.wfile.flush() return else: self.send_response(404) self.end_headers() def get_free_port(): s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) s.bind(('localhost', 0)) address, port = s.getsockname() s.close() return port def start_mock_server(port): mock_server = HTTPServer(('localhost', port), MockServerRequestHandler) mock_server_thread = Thread(target=mock_server.serve_forever) mock_server_thread.setDaemon(True) mock_server_thread.start() if __name__ == "__main__": mock_server_port = get_free_port() start_mock_server(mock_server_port) resp = urlopen('http://localhost:{port}/downloads/1.0.tar.gz'.format(port=mock_server_port)) fdesc, fname = tempfile.mkstemp() fhand = os.fdopen(fdesc, "wb") shutil.copyfileobj(resp.fp, fhand) fhand.close() with open(fname, 'r') as rfh: data = rfh.read(); assert data == FILE_CONTENT, '%s, %s'%(len(FILE_CONTENT), len(data))