Author thehesiod
Recipients Justin Mayfield, gvanrossum, thehesiod, vstinner, yselivanov
Date 2015-11-11.23:26:09
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1447284370.15.0.885887607456.issue25593@psf.upfronthosting.co.za>
In-reply-to
Content
Perhaps I'm doing something really stupid, but I was able to reproduce the two issues I'm having with the following sample script. If you leave the monkey patch disabled, you get the InvalidStateError, if you enable it, you get the ServerDisconnect errors that I'm currently seeing which I work-around with retries.  Ideas?

import asyncio
import aiohttp
import multiprocessing
import aiohttp.server
import logging
import traceback

# Monkey patching
import asyncio.selector_events

# http://bugs.python.org/issue25593
if False:
    orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb
    def _sock_connect_cb(self, fut, sock, address):
        if fut.done(): return
        return orig_sock_connect_cb(self, fut, sock, address)
    asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb


class HttpRequestHandler(aiohttp.server.ServerHttpProtocol):
    @asyncio.coroutine
    def handle_request(self, message, payload):
        response = aiohttp.Response(self.writer, 200, http_version=message.version)
        response.add_header('Content-Type', 'text/html')
        response.add_header('Content-Length', '18')
        response.send_headers()
        yield from asyncio.sleep(0.5)
        response.write(b'<h1>It Works!</h1>')
        yield from response.write_eof()


def process_worker(q):
    loop = asyncio.get_event_loop()
    #loop.set_debug(True)
    connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True)
    session = aiohttp.ClientSession(connector=connector)
    async_queue = asyncio.Queue(100)

    @asyncio.coroutine
    def async_worker(session, async_queue):
        while True:
            try:
                print("blocking on asyncio queue get")
                url = yield from async_queue.get()
                print("unblocking on asyncio queue get")
                print("get aqueue size:", async_queue.qsize())
                response = yield from session.request('GET', url)
                try:
                    data = yield from response.read()
                    print(data)
                finally:
                    yield from response.wait_for_close()
            except:
                traceback.print_exc()

    def producer(q):
        print("blocking on multiprocessing queue get")
        obj2 = q.get()
        print("unblocking on multiprocessing queue get")
        print("get qempty:", q.empty())
        return obj2

    def worker_done(f):
        try:
            f.result()
            print("worker exited")
        except:
            traceback.print_exc()

    workers = []
    for i in range(100):
        t = asyncio.ensure_future(async_worker(session, async_queue))
        t.add_done_callback(worker_done)
        workers.append(t)

    @asyncio.coroutine
    def doit():
        print("start producer")
        obj = yield from loop.run_in_executor(None, producer, q)
        print("finish producer")

        print("blocking on asyncio queue put")
        yield from async_queue.put(obj)
        print("unblocking on asyncio queue put")
        print("put aqueue size:", async_queue.qsize())

    while True:
        loop.run_until_complete(doit())


def server():
    loop = asyncio.get_event_loop()
    #loop.set_debug(True)

    f = loop.create_server(lambda: HttpRequestHandler(debug=True, keep_alive=75), '0.0.0.0', '8080')

    srv = loop.run_until_complete(f)
    loop.run_forever()


if __name__ == '__main__':
    q = multiprocessing.Queue(100)

    log_proc = multiprocessing.log_to_stderr()
    log_proc.setLevel(logging.DEBUG)

    p = multiprocessing.Process(target=process_worker, args=(q,))
    p.start()

    p2 = multiprocessing.Process(target=server)
    p2.start()

    while True:
        print("blocking on multiprocessing queue put")
        q.put("http://0.0.0.0:8080")
        print("unblocking on multiprocessing queue put")

        print("put qempty:", q.empty())
History
Date User Action Args
2015-11-11 23:26:10thehesiodsetrecipients: + thehesiod, gvanrossum, vstinner, yselivanov, Justin Mayfield
2015-11-11 23:26:10thehesiodsetmessageid: <1447284370.15.0.885887607456.issue25593@psf.upfronthosting.co.za>
2015-11-11 23:26:10thehesiodlinkissue25593 messages
2015-11-11 23:26:09thehesiodcreate