classification
Title: delicate behaviour of shared (managed) multiprocessing Queues
Type: behavior Stage:
Components: Library (Lib) Versions: Python 3.7, Python 3.5
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Oren Milman, Prof Plum, davin, pitrou, tlxxzj
Priority: normal Keywords:

Created on 2017-07-31 20:05 by Prof Plum, last changed 2017-10-09 14:36 by pitrou.

Files
File name Uploaded Description Edit
bug_demo.py Prof Plum, 2017-07-31 20:05 reproduces race condition
Messages (9)
msg299582 - (view) Author: Prof Plum (Prof Plum) Date: 2017-07-31 20:05
So I was writing code that had multiple write thread and read thread "groups" in a single pool (in a group a few write threads write to a queue that a read thread reads), and I ran into what I think is a race condition with the multiprocessing.Manager() class. It looks managed queues are returned from Manager() before they are actually initialized and safe to use, but it is only noticeable when making many managed queues in quick succession. I've attached a simple demo script to reproduce the bug, the reason I believe this is race condition is because while the sleep(0.5) line is commented out python crashes, but when it's not it doesn't.

Also I'm on windows 10 and using 64 bit Python 3.5.2
msg302742 - (view) Author: Lang (tlxxzj) Date: 2017-09-22 10:52
# code reproduce bug
# KeyError in lib\multiprocessing\managers.py in incref 

import multiprocessing as mp
from time import sleep

def func(queue):
    pass

if __name__ == '__main__':
    manager = mp.Manager()

    pool = mp.Pool(1)

    queue = manager.Queue()
    r = pool.apply_async(func, args = [queue])
    #sleep(1)
    queue = None

    pool.close()
    pool.join()
msg303776 - (view) Author: Oren Milman (Oren Milman) * Date: 2017-10-05 16:59
IIUC:
In Lang's example, doing `queue = None` caused the destruction of the shared
queue, which caused a call to BaseProxy._decref() (in multiprocessing/managers.py),
which dispatched a decref request to the manager's server process.

Meanwhile, the pool's worker process (in function worker() in multiprocessing/pool.py)
tries to retrieve a task from its task queue, by calling inqueue.get().
The get() method unpickles the first pickled task in the queue, which is the
function and arguments that we passed to apply_async().
The unpickling of the shared queue causes creating a proxy object for the
shared queue, in which BaseProxy.__init__() is called, which calls
BaseProxy._incref(), which dispatches an incref request to the manager's server
process.

Unfortunately, the decref request gets to the server before the incref request.
So when the server receives the decref request (in Server.handle_request()),
and accordingly calls Server.decref(), the refcount of the shared queue in the
server is 1, so the refcount is decremented to 0, and the shared queue is
disposed.
Then, when the server receives the incref request, it tries to increment the
refcount of the shared queue (in Server.incref()), but can't find it in its
refcount dict, so it raises the KeyError.
(If, for example, you added a 'sleep(0.5)' before the call to dispatch() in
BaseProxy._decref(), the incref request would win the race, and the KeyError
wouldn't be raised.)


Should we fix this?
Or is it the responsibility of the user to not destroy shared objects too soon?
(In that case, maybe we should mention it in the docs?)


The situation in the example of Prof Plum is similar.
Also, note that this issue is not specific to using pool workers or to
Manager.Queue. For example, we get a similar error (for similar reasons) in
this code:

from multiprocessing import Process, Manager
from time import sleep
if __name__ == '__main__':
    with Manager() as manager:
        shared_list = manager.list()
        p = Process(target=sorted, args=(shared_list,))
        p.start()
        # sleep(0.5)
        shared_list = None
        p.join()
msg303778 - (view) Author: Oren Milman (Oren Milman) * Date: 2017-10-05 17:09
Prof Plum, i changed the type of the issue to 'behavior', because Lang and me
both got a KeyError. if your interpreter actually crashed, please change it
back to 'crash'.
msg303780 - (view) Author: Prof Plum (Prof Plum) Date: 2017-10-05 18:54
Oh I see, I thought getting an error that caused the python code execution to terminate was considered a "crash".

On the note of whether you should fix this I think the answer is yes. When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process (i.e. sending the incr-ref request). That being said I could see people wanting to utilize the minor performance gain of having the worker start AND run asynchronously so I think this option should be available via a boolean arg to apply_async() but it should be off by default because that is the safer and intuitive behavior of apply_async().
msg303812 - (view) Author: Oren Milman (Oren Milman) * Date: 2017-10-06 09:52
Davin and Antoine, i added you to the nosy list because you are listed
as multiprocessing experts :)
msg303916 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-10-08 19:55
@Prof Plum

> When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process

Well... it's called *async* for a reason, so I'm not sure why the behaviour would be partially synchronous.

@Oren

> Should we fix this?

I'm not sure how.  In mp.Pool we don't want to keep references to input objects longer than necessary.

> Or is it the responsibility of the user to not destroy shared objects too soon?  (In that case, maybe we should mention it in the docs?)

Yes to both questions, IMO.

(note: changing title to better reflect issue)
msg303921 - (view) Author: Prof Plum (Prof Plum) Date: 2017-10-08 21:22
@Antoine Pitrou

>Well... it's called *async* for a reason, so I'm not sure why the behaviour would be partially synchronous.

To a avoid race condition

>I'm not sure how.  In mp.Pool we don't want to keep references to input objects longer than necessary.

Like I said you could just add some sort of "safe" flag to the apply_async() call safe=True would mean the initialization of the worker is done synchronously safe=False would be the normal behavior. Even if you decide it's the user's responsibility to not delete the queue if the user's code is exiting a function that would basically amount to them calling sleep() for some guessed amount of time. With a safe flag they wouldn't have to guess the time or call sleep which is kinda ugly IMO. Also if someone see's that apply_async() has a safe flag they are more likely to look up what it does than they are to read the full docs to apply_async().
msg303964 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-10-09 14:36
> To a avoid race condition

As Oren explained, the race condition is due to your use of the managed Queue.  If you keep the object alive in the main process until the tasks have finished, there shouldn't be any problem.  The question is: is there any reason you don't want to?
History
Date User Action Args
2017-10-09 14:36:56pitrousetmessages: + msg303964
2017-10-08 21:22:37Prof Plumsetmessages: + msg303921
2017-10-08 19:55:26pitrousetmessages: + msg303916
title: multiprocessing.Manager() race condition -> delicate behaviour of shared (managed) multiprocessing Queues
2017-10-06 09:52:36Oren Milmansetnosy: + pitrou, davin
messages: + msg303812
2017-10-05 18:54:36Prof Plumsetmessages: + msg303780
2017-10-05 17:09:39Oren Milmansetmessages: + msg303778
2017-10-05 16:59:36Oren Milmansetversions: + Python 3.7
nosy: + Oren Milman

messages: + msg303776

components: + Library (Lib)
type: crash -> behavior
2017-09-22 10:52:57tlxxzjsetnosy: + tlxxzj
messages: + msg302742
2017-09-21 23:32:48Prof Plumsettitle: Potential multiprocessing.Manager() race condition -> multiprocessing.Manager() race condition
2017-07-31 20:05:27Prof Plumcreate