"""A benchmark for mixing asyncio and GIL-holding background threads""" import argparse import asyncio import threading import time from concurrent.futures import ProcessPoolExecutor async def client(port, n_conns, msg_size): msg = b"x" * msg_size count = 0 running = True async def client_handler(reader, writer): nonlocal count while running: writer.write(msg) await reader.read(msg_size) count += 1 def stop(): nonlocal running running = False conns = [] try: # Open all connections for _ in range(n_conns): r, w = await asyncio.open_connection("127.0.0.1", port) conns.append((r, w)) # Start a timer to stop running asyncio.get_running_loop().call_later(5, stop) # Send requests until stopped tasks = [client_handler(r, w) for r, w in conns] await asyncio.gather(*tasks, return_exceptions=True) finally: # Cleanup connections for r, w in conns: w.close() await w.wait_closed() return count / 5 def client_main(port, n_conns, msg_size): return asyncio.run(client(port, n_conns, msg_size)) async def main(n_conns, msg_size, n_background_threads): async def server_handler(reader, writer): try: while data := await reader.read(msg_size): writer.write(data) finally: writer.close() await writer.wait_closed() # Start the server server = await asyncio.start_server(server_handler, "127.0.0.1", 0) port = server.sockets[0].getsockname()[1] async with server: loop = asyncio.get_running_loop() with ProcessPoolExecutor(1) as ex: # Create the background threads (if any) threads = [Spinner() for _ in range(n_background_threads)] rps = await loop.run_in_executor(ex, client_main, port, n_conns, msg_size) for t in threads: t.stop() print(f"{rps} RPS") class Spinner(threading.Thread): """A background thread that holds the GIL and spins a counter""" def __init__(self): self.count = 0 self.running = True super().__init__(daemon=True) self.start() def run(self): self.start_time = time.time() while self.running: self.count += 1 def stop(self): self.running = False duration = time.time() - self.start_time print(f"Spinner spun {self.count / duration:.3} cycles/second") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Benchmark asyncio with GIL contention" ) parser.add_argument( "--clients", "-c", default=1, type=int, help="Number of clients" ) parser.add_argument( "--bytes", "-b", default=100, type=int, help="total payload size in bytes" ) parser.add_argument( "--background-threads", "-t", type=int, default=0, help="Number of background threads competing for the GIL", ) args = parser.parse_args() print( f"Benchmark: clients = {args.clients}, " f"msg-size = {args.bytes}, " f"background-threads = {args.background_threads}" ) asyncio.run(main(args.clients, args.bytes, args.background_threads))