This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author Oren Milman
Recipients Oren Milman, Prof Plum, tlxxzj
Date 2017-10-05.16:59:35
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1507222776.14.0.213398074469.issue31092@psf.upfronthosting.co.za>
In-reply-to
Content
IIUC:
In Lang's example, doing `queue = None` caused the destruction of the shared
queue, which caused a call to BaseProxy._decref() (in multiprocessing/managers.py),
which dispatched a decref request to the manager's server process.

Meanwhile, the pool's worker process (in function worker() in multiprocessing/pool.py)
tries to retrieve a task from its task queue, by calling inqueue.get().
The get() method unpickles the first pickled task in the queue, which is the
function and arguments that we passed to apply_async().
The unpickling of the shared queue causes creating a proxy object for the
shared queue, in which BaseProxy.__init__() is called, which calls
BaseProxy._incref(), which dispatches an incref request to the manager's server
process.

Unfortunately, the decref request gets to the server before the incref request.
So when the server receives the decref request (in Server.handle_request()),
and accordingly calls Server.decref(), the refcount of the shared queue in the
server is 1, so the refcount is decremented to 0, and the shared queue is
disposed.
Then, when the server receives the incref request, it tries to increment the
refcount of the shared queue (in Server.incref()), but can't find it in its
refcount dict, so it raises the KeyError.
(If, for example, you added a 'sleep(0.5)' before the call to dispatch() in
BaseProxy._decref(), the incref request would win the race, and the KeyError
wouldn't be raised.)


Should we fix this?
Or is it the responsibility of the user to not destroy shared objects too soon?
(In that case, maybe we should mention it in the docs?)


The situation in the example of Prof Plum is similar.
Also, note that this issue is not specific to using pool workers or to
Manager.Queue. For example, we get a similar error (for similar reasons) in
this code:

from multiprocessing import Process, Manager
from time import sleep
if __name__ == '__main__':
    with Manager() as manager:
        shared_list = manager.list()
        p = Process(target=sorted, args=(shared_list,))
        p.start()
        # sleep(0.5)
        shared_list = None
        p.join()
History
Date User Action Args
2017-10-05 16:59:36Oren Milmansetrecipients: + Oren Milman, Prof Plum, tlxxzj
2017-10-05 16:59:36Oren Milmansetmessageid: <1507222776.14.0.213398074469.issue31092@psf.upfronthosting.co.za>
2017-10-05 16:59:36Oren Milmanlinkissue31092 messages
2017-10-05 16:59:35Oren Milmancreate