classification
Title: multiprocessing.Queue's put() not atomic thread wise
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 3.1, Python 2.6
process
Status: closed Resolution: duplicate
Dependencies: Superseder: Unhelpful backtrace for multiprocessing.Queue
View: 10886
Assigned To: jnoller Nosy List: asksol, jnoller, meador.inge, neologix, sbt, vilnis.termanis
Priority: normal Keywords: patch

Created on 2010-03-01 22:20 by vilnis.termanis, last changed 2011-10-06 20:20 by neologix. This issue is now closed.

Files
File name Uploaded Description Edit
queue_example.py vilnis.termanis, 2010-03-02 16:40 put() behaviour for multiprocessing.queue classes
queue_perf.py vilnis.termanis, 2010-03-02 21:12 Script which was used for comparing before/after performance (msg100307)
queues.diff vilnis.termanis, 2010-03-07 03:37 Patch against release26-maint, with test
patch_27maint.diff vilnis.termanis, 2010-11-02 23:08 Patch against release27-maint, with test review
Messages (18)
msg100278 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-01 22:20
If an object, which as been put() in the multiprocessing.Queue is changed immediately after the put() call then changed version may be added to the queue which I assume is not the expected behaviour:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> obj = [[i for i in xrange(j * 10, (j * 10) + 10)] for j in xrange(0,10)]
>>> q.put(obj); obj[-1][-1] = None
>>> obj2 = q.get()
>>> print obj2[-1][-1]
None

Note: This also happens if the queue is called form a child process like in the attached example.
msg100280 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-01 22:28
More notes:
- Reason for non-atmoic behaviour is due to queue _feeder thread sending the object after the put() method has already exited. Hence changing the object straight after put() can result in the modified object being sent instead.
- Reproduced under Win32 and Ubuntu-64 with v2.6.4
- Not tried with v3.* but looking at the ToT code
  it might also be affected
- The attached patch makes put() atomic but I am completely unsure about whether this is the correct way to address the problem (if it's seen as one).

Regards,
VT
msg100282 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-02 00:50
Quick though (last one I promise):

Maybe put() could have an argument to indicate whether to be thread-blocking (i.e. guaranteeing the object to be sent in its current state) or not. The (current) non-thread-blocking mode will have better performance for the put()-calling process if there are a lot of processes trying to write to the queue at once since it can continue regardless (while the _feed thread waits for the internal Pipe to become available).
msg100285 - (view) Author: Meador Inge (meador.inge) * (Python committer) Date: 2010-03-02 01:41
Could you please generate the patch as described in the Patch Submission Guidelines [1] ?  Also, please be sure to add unit tests that can reproduce the reported error.

[1] http://www.python.org/dev/patches/
msg100287 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-02 02:02
Hi Meador,

I apologise: I really shouldn't have called it a patch. It's just to show one way that I've found fixes the problem (but presumably reduces performance in other cases, hence the suggestion & wait for feedback on the issue). I'm not really experienced with Python (nor am I an experienced / particularly good developer in general) so it's probably safer to treat me just as a regular user who stumbled upon a problem. :) (I'm definitely not shying away from "ticking all the boxes" but I don't see that there is any point in me trying to yet.)

Regards,
VT

PS: I've re-attached the example "fix" with the diff the right way round (sorry).
msg100302 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-02 16:38
I couldn't see the wood for the trees:
If put() is thread-blocking, multiprocessing.Queue is reduced to the same functionality as multiprocessing.queues.SimpleQueue, if I'm not mistaken. So maybe there should be a warning in the documentation that, for multiprocessing.[Joinable]Queue, modifying obj straight after calling put(obj) might en-queue the modified version.

To me at least this wasn't obvious until I looked at the multiprocessing.queue code.

I've modified the example for clarity and retracted the (unworthy) patch attempt.

Regards,
VT
msg100303 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-02 18:24
Alternative suggestion (since SimpleQueue doesn't provide buffering):

Allow option to force immediate pickling of the object to be en-queued, i.e. pickling when adding to internal buffer instead of as part of Connection.send() in _feed thread. Does this sound like a reasonable solution? (I'll do my best to write unit tests etc. as per submission guidelines, if so.)

Regards,
VT
msg100307 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-02 21:03
Performance comparison before/after suggested change (adding/removing 1000-item list 10000 times).

Inline
----
1.926 <class 'multiprocessing.queues.Queue'>
1.919 <class 'queues_pickled.Queue'>
1.907 <class 'queues_pickled.Queue'> (bufferPickled = True)

<class 'threading.Thread'>
----
2.138 <class 'multiprocessing.queues.Queue'>
2.379 <class 'queues_pickled.Queue'>
2.304 <class 'queues_pickled.Queue'> (bufferPickled = True)

<class 'multiprocessing.process.Process'>
----
1.158 <class 'multiprocessing.queues.Queue'>
1.151 <class 'queues_pickled.Queue'>
1.141 <class 'queues_pickled.Queue'> (bufferPickled = True)
msg100557 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-07 03:15
Updated patch to include new test (now against release26-maint branch). Verified test_multiprocessing suite passes before (apart from new test) & after change. (Tested under Ubuntu 9.10 64-bit)
msg100558 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-03-07 03:17
Updated patch to include new test (now against release26-maint branch). Verified test_multiprocessing suite passes before (apart from new test) & after change. (Tested under Ubuntu 9.10 64-bit)
msg119495 - (view) Author: Ask Solem (asksol) (Python committer) Date: 2010-10-24 08:19
AFAICS the object will be pickled twice with this patch.
See Modules/_multiprocessing/connection.h: connection_send_obj.
msg119496 - (view) Author: Ask Solem (asksol) (Python committer) Date: 2010-10-24 08:22
aha, no. I see now you use connection.send_bytes instead.
Then I can't think of any issues with this patch, but I don't know why
it was done this way in the first place.
msg119504 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-10-24 10:38
The idea is to pickle the object immediately (i.e. when added) instead of when requested (dequeued). This means that any operations (even deletion) performed on the original object do not make changes to the item in the queue, before it has been dequeued.
Whether this is an acceptable solution I don't know.. it's just my proposal. The attached queue_example.py script illustrates what I mean.

Regards,
VT
msg120232 - (view) Author: Ask Solem (asksol) (Python committer) Date: 2010-11-02 15:40
Pickling on put makes sense to me.  I can't think of cases where this could break existing code either.  I think this may also resolve issue 8323
msg120284 - (view) Author: Vilnis Termanis (vilnis.termanis) * Date: 2010-11-02 23:08
Please find attached an updated patch (including unit test) for the release27-maint branch. I've run the test_multiprocessing suite against svn revision 86129 of said branch.
msg143158 - (view) Author: Richard Oudkerk (sbt) * (Python committer) Date: 2011-08-29 16:36
Modifying an object which is already on a traditional queue can also change what is received by the other thread (depending on timing).  So Queue.Queue's put() is not "atomic" either.  Therefore I do not believe this behaviour is a bug. 

However the solution proposed is a good one since it fixes Issue 10886.  In addition it prevents arbitrary code being run in the background thread by weakref callbacks or __del__ methods.  Such arbitrary code may cause inconsistent state in a forked process if the fork happens while the queue's thread is running -- see issue 6271.

I have submitted a patch for Issue 10886.  It is basically the same as 
patch_27maint.diff, but it is against the default mercurial branch.  (Also, it is a bit simpler because does it does not unnecessarily modify Queue.get().)

I would suggest closing this issue and letting Issue 10886 take it's place.
msg143160 - (view) Author: Richard Oudkerk (sbt) * (Python committer) Date: 2011-08-29 16:39
I meant Issue 6721 (Locks in python standard library should be sanitized on fork) not 6271.
msg145036 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-10-06 20:20
> Modifying an object which is already on a traditional queue can also
> change what is received by the other thread (depending on timing). 
> So Queue.Queue's put() is not "atomic" either.  Therefore I do not
> believe this behaviour is a bug.

Agreed.

> However the solution proposed is a good one since it fixes Issue
> 10886.  In addition it prevents arbitrary code being run in the
> background thread by weakref callbacks or __del__ methods.  Such
> arbitrary code may cause inconsistent state in a forked process if
> the fork happens while the queue's thread is running -- see issue
> 6271.
[...]
> I would suggest closing this issue and letting Issue 10886 take it's
> place.

Makes sense.
History
Date User Action Args
2011-10-06 20:20:39neologixsetstatus: open -> closed

superseder: Unhelpful backtrace for multiprocessing.Queue

nosy: + neologix
messages: + msg145036
resolution: duplicate
stage: test needed -> resolved
2011-08-29 16:39:17sbtsetmessages: + msg143160
2011-08-29 16:36:40sbtsetnosy: + sbt
messages: + msg143158
2010-11-02 23:08:46vilnis.termanissetfiles: + patch_27maint.diff

messages: + msg120284
2010-11-02 15:40:15asksolsetmessages: + msg120232
stage: test needed
2010-10-24 10:38:13vilnis.termanissetmessages: + msg119504
2010-10-24 08:22:06asksolsetmessages: + msg119496
2010-10-24 08:19:03asksolsetnosy: + asksol
messages: + msg119495
2010-04-30 18:52:42vilnis.termanissetversions: + Python 3.1
2010-03-07 03:37:55vilnis.termanissetfiles: + queues.diff
2010-03-07 03:37:46vilnis.termanissetfiles: - queues.diff
2010-03-07 03:19:46vilnis.termanissetfiles: - queues.diff
2010-03-07 03:17:20vilnis.termanissetfiles: + queues.diff

messages: + msg100558
2010-03-07 03:15:18vilnis.termanissetfiles: + queues.diff

messages: + msg100557
2010-03-07 03:10:11vilnis.termanissetfiles: - pickle_suggestion_v2.patch
2010-03-02 21:12:54vilnis.termanissetfiles: + queue_perf.py
2010-03-02 21:03:52vilnis.termanissetfiles: - pickle_suggestion.patch
2010-03-02 21:03:29vilnis.termanissetfiles: + pickle_suggestion_v2.patch

messages: + msg100307
2010-03-02 18:24:18vilnis.termanissetfiles: + pickle_suggestion.patch

messages: + msg100303
2010-03-02 16:40:27vilnis.termanissetfiles: + queue_example.py
2010-03-02 16:39:04vilnis.termanissetfiles: - queue_example.py
2010-03-02 16:39:00vilnis.termanissetfiles: - lock_suggestion.patch
2010-03-02 16:38:46vilnis.termanissetmessages: + msg100302
2010-03-02 02:02:47vilnis.termanissetfiles: + lock_suggestion.patch

messages: + msg100287
2010-03-02 01:49:26vilnis.termanissetfiles: - queues.patch
2010-03-02 01:41:36meador.ingesetnosy: + meador.inge
messages: + msg100285
2010-03-02 00:50:19vilnis.termanissetmessages: + msg100282
2010-03-01 22:34:19benjamin.petersonsetassignee: jnoller

nosy: + jnoller
2010-03-01 22:28:32vilnis.termanissetfiles: + queues.patch
keywords: + patch
messages: + msg100280
2010-03-01 22:20:05vilnis.termaniscreate