import asyncio import aiohttp import multiprocessing import traceback import http.server import time # Monkey patching # http://bugs.python.org/issue25593 if True: import asyncio.selector_events orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb def _sock_connect_cb(self, fut, sock, address): if fut.done(): return return orig_sock_connect_cb(self, fut, sock, address) asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb def process_worker(q): loop = asyncio.get_event_loop() connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True) session = aiohttp.ClientSession(connector=connector) async_queue = asyncio.Queue(100) @asyncio.coroutine def async_worker(async_queue): while True: try: url = yield from async_queue.get() response = yield from session.request('GET', url) try: data = yield from response.read() print(data) finally: yield from response.wait_for_close() pass except: traceback.print_exc() def worker_done(f): try: f.result() print("worker exited") except: traceback.print_exc() workers = [] for i in range(100): t = asyncio.ensure_future(async_worker(async_queue)) t.add_done_callback(worker_done) workers.append(t) def producer(q): obj2 = q.get() return obj2 @asyncio.coroutine def doit(): obj = yield from loop.run_in_executor(None, producer, q) yield from async_queue.put(obj) while True: loop.run_until_complete(doit()) class GetHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): f = self.send_head() time.sleep(0.5) if f: try: self.copyfile(f, self.wfile) finally: f.close() def run_server(server_class=http.server.HTTPServer, handler_class=GetHandler): server_address = ('0.0.0.0', 8080) httpd = server_class(server_address, handler_class) httpd.serve_forever() if __name__ == '__main__': q = multiprocessing.Queue(100) p = multiprocessing.Process(target=process_worker, args=(q,)) p.start() p2 = multiprocessing.Process(target=run_server) p2.start() while True: q.put("http://0.0.0.0:8080")