#!/usr/bin/env python import argparse import hashlib import http.client import io import socket import textwrap import time import urllib.parse from typing import Callable, Tuple, Optional import numpy as np _Method = Callable[[str], bytes] METHODS = {} def method(name: str) -> Callable[[_Method], _Method]: def decorate(func: _Method) -> _Method: METHODS[name] = func return func return decorate @method('httpclient-read') def load_httpclient_read(url: str) -> bytes: parts = urllib.parse.urlparse(url) assert parts.hostname is not None assert parts.scheme == 'http' conn = http.client.HTTPConnection(parts.hostname, parts.port or 80) conn.request('GET', parts.path) resp = conn.getresponse() return resp.read() @method('httpclient-read-length') def load_httpclient_read_length(url: str) -> bytes: parts = urllib.parse.urlparse(url) assert parts.hostname is not None assert parts.scheme == 'http' conn = http.client.HTTPConnection(parts.hostname, parts.port or 80) conn.request('GET', parts.path) resp = conn.getresponse() return resp.read(resp.length) # type: ignore @method('httpclient-read-raw') def load_httpclient_read_length(url: str) -> bytes: parts = urllib.parse.urlparse(url) assert parts.hostname is not None assert parts.scheme == 'http' conn = http.client.HTTPConnection(parts.hostname, parts.port or 80) conn.request('GET', parts.path) resp = conn.getresponse() return resp.fp.read(resp.length) # type: ignore def prepare_socket(url: str) -> Tuple[io.BufferedIOBase, int]: parts = urllib.parse.urlparse(url) address = (parts.hostname, parts.port) sock = socket.socket() sock.connect(address) req_header = textwrap.dedent(f'''\ GET {parts.path} HTTP/1.1 Host: {parts.hostname}:{parts.port} User-Agent: python Connection: close Accept: */* ''').replace('\n', '\r\n').encode('ascii') fh = sock.makefile('rwb') fh.write(req_header) fh.flush() content_length: Optional[int] = None while True: line = fh.readline() if line == b'\r\n': if content_length is None: raise RuntimeError('Did not receive Content-Length header') return fh, content_length # type: ignore else: text = line.decode('latin-1').rstrip().lower() if text.startswith('content-length: '): content_length = int(text.split(' ')[1]) @method('socket-read') def load_socket_read(url: str) -> bytes: fh, content_length = prepare_socket(url) return fh.read(content_length) def main(): parser = argparse.ArgumentParser() parser.add_argument('--passes', type=int, default=5) parser.add_argument('url') args = parser.parse_args() for name, method in METHODS.items(): rates = [] for i in range(args.passes): start = time.monotonic() data = method(args.url) stop = time.monotonic() elapsed = stop - start rates.append(len(data) / elapsed) mean = np.mean(rates) std = np.std(rates) / np.sqrt(args.passes - 1) print('{}: {:.1f} ± {:.1f} MB/s'.format(name, mean / 1e6, std / 1e6)) if __name__ == '__main__': main()