Message254484
Sorry for being obscure before, it was hard to pinpoint. I think I just figured it out! I had code like this in a subprocess:
def worker():
while True:
obj = self.queue.get()
# do work with obj using asyncio http module
def producer():
nonlocal self
obj2 = self.queue.get()
return obj2
workers = []
for i in range(FILE_OP_WORKERS):
t = asyncio.ensure_future(worker())
t.add_done_callback(op_finished)
workers.append(t)
while True:
f = loop.run_in_executor(None, producer)
obj = loop.run_until_complete(f)
t = async_queue.put(obj)
loop.run_until_complete(t)
loop.run_until_complete(asyncio.wait(workers))
where self.queue is a multiprocessing.Queue, and async_queue is an asyncio queue. The idea is that I have a process populating a multiprocessing queue, and I want to transfer it to an syncio queue while letting the workers do their thing.
Without knowing the underlying behavior, my theory is that when python blocks on the multiprocessing queue lock, it releases socket events to the async http module's selectors, and then when the async loop gets to the selectors they're released again.
If I switch the producer to instead use a queue.get_nowait and busy wait with asyncio.sleep I don't get the error...however this is not ideal is we're busy waiting.
Thanks! |
|
Date |
User |
Action |
Args |
2015-11-11 09:09:51 | thehesiod | set | recipients:
+ thehesiod, gvanrossum, vstinner, yselivanov |
2015-11-11 09:09:51 | thehesiod | set | messageid: <1447232991.25.0.43847757486.issue25593@psf.upfronthosting.co.za> |
2015-11-11 09:09:51 | thehesiod | link | issue25593 messages |
2015-11-11 09:09:50 | thehesiod | create | |
|