Message254501
Perhaps I'm doing something really stupid, but I was able to reproduce the two issues I'm having with the following sample script. If you leave the monkey patch disabled, you get the InvalidStateError, if you enable it, you get the ServerDisconnect errors that I'm currently seeing which I work-around with retries. Ideas?
import asyncio
import aiohttp
import multiprocessing
import aiohttp.server
import logging
import traceback
# Monkey patching
import asyncio.selector_events
# http://bugs.python.org/issue25593
if False:
orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb
def _sock_connect_cb(self, fut, sock, address):
if fut.done(): return
return orig_sock_connect_cb(self, fut, sock, address)
asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb
class HttpRequestHandler(aiohttp.server.ServerHttpProtocol):
@asyncio.coroutine
def handle_request(self, message, payload):
response = aiohttp.Response(self.writer, 200, http_version=message.version)
response.add_header('Content-Type', 'text/html')
response.add_header('Content-Length', '18')
response.send_headers()
yield from asyncio.sleep(0.5)
response.write(b'<h1>It Works!</h1>')
yield from response.write_eof()
def process_worker(q):
loop = asyncio.get_event_loop()
#loop.set_debug(True)
connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True)
session = aiohttp.ClientSession(connector=connector)
async_queue = asyncio.Queue(100)
@asyncio.coroutine
def async_worker(session, async_queue):
while True:
try:
print("blocking on asyncio queue get")
url = yield from async_queue.get()
print("unblocking on asyncio queue get")
print("get aqueue size:", async_queue.qsize())
response = yield from session.request('GET', url)
try:
data = yield from response.read()
print(data)
finally:
yield from response.wait_for_close()
except:
traceback.print_exc()
def producer(q):
print("blocking on multiprocessing queue get")
obj2 = q.get()
print("unblocking on multiprocessing queue get")
print("get qempty:", q.empty())
return obj2
def worker_done(f):
try:
f.result()
print("worker exited")
except:
traceback.print_exc()
workers = []
for i in range(100):
t = asyncio.ensure_future(async_worker(session, async_queue))
t.add_done_callback(worker_done)
workers.append(t)
@asyncio.coroutine
def doit():
print("start producer")
obj = yield from loop.run_in_executor(None, producer, q)
print("finish producer")
print("blocking on asyncio queue put")
yield from async_queue.put(obj)
print("unblocking on asyncio queue put")
print("put aqueue size:", async_queue.qsize())
while True:
loop.run_until_complete(doit())
def server():
loop = asyncio.get_event_loop()
#loop.set_debug(True)
f = loop.create_server(lambda: HttpRequestHandler(debug=True, keep_alive=75), '0.0.0.0', '8080')
srv = loop.run_until_complete(f)
loop.run_forever()
if __name__ == '__main__':
q = multiprocessing.Queue(100)
log_proc = multiprocessing.log_to_stderr()
log_proc.setLevel(logging.DEBUG)
p = multiprocessing.Process(target=process_worker, args=(q,))
p.start()
p2 = multiprocessing.Process(target=server)
p2.start()
while True:
print("blocking on multiprocessing queue put")
q.put("http://0.0.0.0:8080")
print("unblocking on multiprocessing queue put")
print("put qempty:", q.empty()) |
|
Date |
User |
Action |
Args |
2015-11-11 23:26:10 | thehesiod | set | recipients:
+ thehesiod, gvanrossum, vstinner, yselivanov, Justin Mayfield |
2015-11-11 23:26:10 | thehesiod | set | messageid: <1447284370.15.0.885887607456.issue25593@psf.upfronthosting.co.za> |
2015-11-11 23:26:10 | thehesiod | link | issue25593 messages |
2015-11-11 23:26:09 | thehesiod | create | |
|