classification
Title: multiprocessing.Queue fails to get() very large objects
Type: behavior Stage: resolved
Components: Documentation Versions: Python 3.1, Python 3.2, Python 2.7
process
Status: closed Resolution: not a bug
Dependencies: Superseder:
Assigned To: jnoller Nosy List: Brian.Cain, Ian.Davis, Matt.Goodman, docs@python, jnoller, neologix, osvenskan, serhiy.storchaka, terry.reedy, vinay.sajip
Priority: low Keywords:

Created on 2010-04-16 18:38 by Ian.Davis, last changed 2015-11-26 17:21 by serhiy.storchaka. This issue is now closed.

Files
File name Uploaded Description Edit
multiproc.py Brian.Cain, 2010-12-09 00:01
multiproc3.py Brian.Cain, 2010-12-09 06:16 py3k version of test case
gdb_stack_trace.txt Brian.Cain, 2010-12-09 06:18 stack trace of failure
Messages (17)
msg103349 - (view) Author: Ian Davis (Ian.Davis) Date: 2010-04-16 18:38
I'm trying to parallelize some scientific computing jobs using multiprocessing.Pool.  I've also tried rolling my own Pool equivalent using Queues.  In trying to return very large result objects from Pool.map()/imap() or via Queue.put(), I've noticed that multiprocessing seems to hang on the receiving end.  On Cygwin 1.7.1/Python 2.5.2 it hangs with no CPU activity.  On Centos 5.2/Python 2.6.2 it hangs with 100% CPU.  cPickle is perfectly capable of pickling these objects, although they may be 100's of MB, so I think it's the communication.  There's also some asymmetry in the error whether it's the parent or child putting the large object.  The put does appear to succeed;  it's the get() on the other end that hangs forever.

Example code:
-----
from multiprocessing import *

def child(task_q, result_q):
    while True:
        print "  Getting task..."
        task = task_q.get()
        print "  Got task", task[:10]
        task = task * 100000000
        print "  Putting result", task[:10]
        result_q.put(task)
        print "  Done putting result", task[:10]
        task_q.task_done()

def parent():
    task_q = JoinableQueue()
    result_q = JoinableQueue()
    worker = Process(target=child, args=(task_q,result_q))
    worker.daemon = True
    worker.start()
    #tasks = ["foo", "bar", "ABC" * 100000000, "baz"]
    tasks = ["foo", "bar", "ABC", "baz"]
    for task in tasks:
        print "Putting task", task[:10], "..."
        task_q.put(task)
        print "Done putting task", task[:10]
    task_q.join()
    for task in tasks:
        print "Getting result..."
        print "Got result", result_q.get()[:10]

if __name__ == '__main__':
    parent()
-----

If run as is, I get
Traceback (most recent call last):
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 242, in _feed
    send(obj)
MemoryError: out of memory
(*** hangs, I hit ^C ***)
Got result
Traceback (most recent call last):
Process Process-1:
Traceback (most recent call last):
  File "cygwin_multiprocessing_queue.py", line 32, in <module>
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 237, in _bootstrap
    parent()
  File "cygwin_multiprocessing_queue.py", line 29, in parent
    print "Got result", result_q.get()[:10]
    self.run()
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 93, in run
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get
    self._target(*self._args, **self._kwargs)
  File "cygwin_multiprocessing_queue.py", line 6, in child
    res = self._recv()
KeyboardInterrupt
    task = task_q.get()
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get
    res = self._recv()
KeyboardInterrupt


If instead I comment out the multiplication in child() and uncomment the large task in parent(), then I get
  Getting task...
Putting task foo ...
Done putting task foo
Putting task bar ...
  Got task foo
  Putting result foo
Done putting task bar
Putting task ABCABCABCA ...
Done putting task ABCABCABCA
Putting task baz ...
  Done putting result foo
  Getting task...
  Got task bar
  Putting result bar
  Done putting result bar
  Getting task...
Done putting task baz
(*** hangs, I hit ^C ***)
Traceback (most recent call last):
  File "cygwin_multiprocessing_queue.py", line 32, in <module>
    parent()
  File "cygwin_multiprocessing_queue.py", line 26, in parent
    task_q.join()
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 303, in join
    self._cond.wait()
  File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/synchronize.py", line 212, in wait
    self._wait_semaphore.acquire(True, timeout)
KeyboardInterrupt
msg113031 - (view) Author: Terry J. Reedy (terry.reedy) * (Python committer) Date: 2010-08-05 19:46
By 'crash', do you actually mean 'hang'? 
Jesse, is it reasonable to stuff pickles of 100s of megabytes through the connections?
msg113033 - (view) Author: Jesse Noller (jnoller) * (Python committer) Date: 2010-08-05 19:51
I don't know that it's unreasonable to send that much data, but it would certainly be slow, and I would not recommend it. Therefore, this is still on the list for when I have time
msg123663 - (view) Author: Brian Cain (Brian.Cain) * Date: 2010-12-09 00:01
I don't think the problem is limited to when hundreds of megabytes are being transmitted.  I believe I am experiencing a problem with the same root cause whose symptoms are slightly different.  It seems like there's a threshhold which causes not merely poor performance, but likely an unrecoverable fault.

Here's the output when I run my example on SLES11.1:

$ ./multiproc.py $((8*1024)) 2
on 2.6 (r26:66714, May  5 2010, 14:02:39) 
[GCC 4.3.4 [gcc-4_3-branch revision 152973]] - Linux linux 2.6.32.12-0.7-default #1 SMP 2010-05-20 11:14:20 +0200 x86_64 x86_64
0 entries in flight, join() took 5949.97 usec, get() did 0.000000 items/sec
2 entries in flight, join() took 1577.85 usec, get() did 42581.766497 items/sec
4 entries in flight, join() took 1966.00 usec, get() did 65536.000000 items/sec
6 entries in flight, join() took 1894.00 usec, get() did 105296.334728 items/sec
8 entries in flight, join() took 1420.02 usec, get() did 199728.761905 items/sec
10 entries in flight, join() took 1950.03 usec, get() did 163840.000000 items/sec
12 entries in flight, join() took 1241.92 usec, get() did 324720.309677 items/sec
...
7272 entries in flight, join() took 2516.03 usec, get() did 10983427.687432 items/sec
7274 entries in flight, join() took 1813.17 usec, get() did 10480717.037444 items/sec
7276 entries in flight, join() took 1979.11 usec, get() did 11421315.832335 items/sec
7278 entries in flight, join() took 2043.01 usec, get() did 11549808.744608 items/sec
^C7280 entries: join() ABORTED by user after 83.08 sec
...

I see similar results when I run this test with a larger step, I just wanted to get finer resolution on the failure point.
msg123666 - (view) Author: Terry J. Reedy (terry.reedy) * (Python committer) Date: 2010-12-09 02:28
2.6.6 was the last bugfix release
msg123671 - (view) Author: Brian Cain (Brian.Cain) * Date: 2010-12-09 06:16
I was able to reproduce the problem on a more recent release.

7279 entries fails, 7278 entries succeeds.


$ ./multiproc3.py 
on 3.1.2 (r312:79147, Apr 15 2010, 12:35:07) 
[GCC 4.4.3] - Linux mini 2.6.32-26-generic #47-Ubuntu SMP Wed Nov 17 15:59:05 UTC 2010 i686 
7278 entries in flight, join() took 12517.93 usec, get() did 413756.736588 items/sec
7278 entries in flight, join() took 19458.06 usec, get() did 345568.562217 items/sec
7278 entries in flight, join() took 21326.07 usec, get() did 382006.563784 items/sec
7278 entries in flight, join() took 14937.16 usec, get() did 404244.835554 items/sec
7278 entries in flight, join() took 18877.98 usec, get() did 354435.878968 items/sec
7278 entries in flight, join() took 20811.08 usec, get() did 408343.738456 items/sec
7278 entries in flight, join() took 14394.04 usec, get() did 423727.055218 items/sec
7278 entries in flight, join() took 18940.21 usec, get() did 361012.624762 items/sec
7278 entries in flight, join() took 19073.96 usec, get() did 367559.024118 items/sec
7278 entries in flight, join() took 16229.87 usec, get() did 424764.763755 items/sec
7278 entries in flight, join() took 18527.03 usec, get() did 355546.367937 items/sec
7278 entries in flight, join() took 21500.11 usec, get() did 390429.802164 items/sec
7278 entries in flight, join() took 13646.84 usec, get() did 410468.669903 items/sec
7278 entries in flight, join() took 18921.14 usec, get() did 355873.819767 items/sec
7278 entries in flight, join() took 13582.94 usec, get() did 287553.877353 items/sec
7278 entries in flight, join() took 21958.11 usec, get() did 405549.873285 items/sec
^C7279 entries: join() ABORTED by user after 5.54 sec
^CError in atexit._run_exitfuncs:
Segmentation fault
msg123672 - (view) Author: Brian Cain (Brian.Cain) * Date: 2010-12-09 06:18
Detailed stack trace when the failure occurs (gdb_stack_trace.txt)
msg129229 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-02-23 23:05
Alright, it's normal behaviour, but since it doesn't seem to be documented, it can be quite surprising.
A queue works like this:
- when you call queue.put(data), the data is added to a deque, which can grow and shrink forever
- then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls).
- when you do queue.get(), you just do a read on the pipe/socket

In multiproc3.py, the items are first appended to the queue, then the sender process is waited on. But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall.
And since a join is performed before dequeing the item, you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full!
If you dequeue the item before waiting the submitter process, everything works fine:


    t0 = time.time()
    try:
        get_t0 = time.time()
        vals = q.get(timeout=3.)
        get_duration = time.time() - get_t0

        s.join()

Now, for the initial report, the problem is related:

def child(task_q, result_q):
    while True:
        print "  Getting task..."
        task = task_q.get()
        print "  Got task", task[:10]
        task = task * 100000000
        print "  Putting result", task[:10]
        result_q.put(task)
        print "  Done putting result", task[:10]
        task_q.task_done()



    tasks = ["foo", "bar", "ABC", "baz"]
    for task in tasks:
        print "Putting task", task[:10], "..."
        task_q.put(task)
        print "Done putting task", task[:10]
    task_q.join()
    for task in tasks:
        print "Getting result..."
        print "Got result", result_q.get()[:10]

When the child puts results, since they're bigger tha 64k, the underlying pipe/socket fills up. Thus, the sending thread blocks on the write, and doesn't dequeue the result_q, which keeps growing.
So you end up storing in the result_q every object before starting to dequeue them, which represents roughly 4 * 3 * 1e8 = 1.2GB, which could explain the out-of-memory errors (and if it's Unicode string it's even much more)...
So the moral is: don't put() to much data to a queue without dequeuing them in a concurrent process...
msg133654 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-04-13 08:51
It's documented in http://docs.python.org/library/multiprocessing.html#multiprocessing-programming :
"""
Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined.
"""

Suggesting to close.
msg134009 - (view) Author: Brian Cain (Brian.Cain) * Date: 2011-04-19 02:54
Please don't close the issue.

Joining aside, the basic point ("But when size = 7279, the data submitted
reaches 64k, so the writting thread blocks on the write syscall.") is not
clear from the docs, right?

IMO, it would be nice if I could ask my queue, "Just what is your capacity
(in bytes, not entries) anyways?  I want to know how much I can put in here
without worrying about whether the remote side is dequeueing."  I guess I'd
settle for explicit documentation that the bound exists.  But how should I
expect my code to be portable?  Are there platforms which provide less than
64k?  Less than 1k?  Less than 256 bytes?
msg134010 - (view) Author: Terry J. Reedy (terry.reedy) * (Python committer) Date: 2011-04-19 03:38
Please do not send html responses, as they result in a spurious 'unnamed' file being attached.

Please do suggest a specific change.

Should this be changed to a doc issue?
msg134082 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-04-19 17:17
> IMO, it would be nice if I could ask my queue, "Just what is your capacity
(in bytes, not entries) anyways?  I want to know how much I can put in here
without worrying about whether the remote side is dequeueing."  I guess I'd
settle for explicit documentation that the bound exists.

It is documented.
See the comment about the "underlying pipe".

>  But how should I
expect my code to be portable?  Are there platforms which provide less than
64k?  Less than 1k?  Less than 256 bytes?

It depends :-)
If the implementation is using pipes, under Linux before 2.6.9 (I think), a pipe was limited by the size of a page, i.e. 4K on x86.
Now, it's 64K.
If it's a Unix socket (via socketpair), the maximum size can be set through sysctl, etc.
So you can't basically state a limit, and IMHO, you should't be concerned with that if you want your code to be portable.
I find the warning excplicit enough, be that's maybe because I'm familiar with this low-level details.
msg134369 - (view) Author: Matt Goodman (Matt.Goodman) Date: 2011-04-25 02:30
You can not pickle individual objects larger than 2**31.  This failure is not handled cleanly in the core module, and I suspect masked by above processes. 

Try piping "a"*(2**31) through you pipe, or pickling it to disk . . .
msg135232 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-05-05 19:08
> You can not pickle individual objects larger than 2**31.

Indeed, but that's not what's happening here, the failure occurs with much smaller objects (also, note the OP's "cPickle is perfectly capable of pickling these objects").
The problem is really with the underlying pipe/unix socket filling up.
msg143075 - (view) Author: Vinay Sajip (vinay.sajip) * (Python committer) Date: 2011-08-27 15:13
I think it's just a documentation issue. The problem with documenting limits is that they are system-specific and, even if the current limits that Charles-François has mentioned are documented, these could become outdated. Perhaps a suggestion could be added to the documentation:

"Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing."
msg143081 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2011-08-27 15:46
> "Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing."

There's a misunderstanding here: there is absolutely no limit on the
size of objects that can be put through a queue (apart from the host's
memory and the 32-bit limit): the problem is really that you can't
just put an arbitrary buch of data to a queue, and then join it before
making sure other processes will *eventually* pop all the data from
the queue.
I.e., you can't do:

q = Queue()
for i in range(1000000):
    q.put(<big obj>)
q.join()

for i in range(10000000):
    q.get()

That's because join() will wait until the feeder thread has managed to
write all the items to the underlying pipe/Unix socket, and this might
hang if the underlying pipe/socket is full (which will happen after
one has put around 128K without having popped any item).

That's what's explained here:

It's documented in
http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
:
"""
Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait
before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the Queue.cancel_join_thread() method of the queue to avoid this
behaviour.)

This means that whenever you use a queue you need to make sure that
all items which have been put on the queue will eventually be removed
before the process is joined. Otherwise you cannot be sure that
processes which have put items on the queue will terminate. Remember
also that non-daemonic processes will be automatically be joined.
"""

If find this wording really clear, but if someone comes up with a
better - i.e. less technical - wording, go ahead.
msg254550 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2015-11-12 20:09
I agree with Charles-François and think this issue should be closed. There is no a bug, and the behavior is documented.
History
Date User Action Args
2015-11-26 17:21:29serhiy.storchakasetstatus: pending -> closed
resolution: not a bug
stage: resolved
2015-11-12 20:09:35serhiy.storchakasetstatus: open -> pending
nosy: + serhiy.storchaka
messages: + msg254550

2011-09-16 07:59:19neologixlinkissue8237 superseder
2011-08-28 16:32:13neologixsetpriority: normal -> low
nosy: + docs@python
components: + Documentation, - Library (Lib)
2011-08-27 15:47:00neologixsetmessages: + msg143081
2011-08-27 15:13:03vinay.sajipsetnosy: + vinay.sajip
messages: + msg143075
2011-05-10 01:11:02osvenskansetnosy: + osvenskan
2011-05-05 19:08:29neologixsetmessages: + msg135232
2011-04-25 02:30:40Matt.Goodmansetnosy: + Matt.Goodman
messages: + msg134369
2011-04-19 17:17:06neologixsetmessages: + msg134082
2011-04-19 03:38:30terry.reedysetmessages: + msg134010
2011-04-19 03:35:40terry.reedysetfiles: - unnamed
2011-04-19 02:54:10Brian.Cainsetfiles: + unnamed

messages: + msg134009
2011-04-13 08:51:48neologixsetmessages: + msg133654
2011-02-23 23:05:44neologixsetnosy: + neologix
messages: + msg129229
2010-12-09 06:18:35Brian.Cainsetfiles: + gdb_stack_trace.txt

messages: + msg123672
2010-12-09 06:16:42Brian.Cainsetfiles: + multiproc3.py

messages: + msg123671
2010-12-09 02:28:14terry.reedysettype: crash -> behavior
messages: + msg123666
versions: - Python 2.6
2010-12-09 00:01:39Brian.Cainsetfiles: + multiproc.py
versions: + Python 2.6
nosy: + Brian.Cain

messages: + msg123663
2010-08-05 19:51:22jnollersetmessages: + msg113033
2010-08-05 19:46:49terry.reedysetnosy: + terry.reedy

messages: + msg113031
versions: - Python 2.6
2010-04-18 18:43:12pitrousetpriority: normal
assignee: jnoller

nosy: + jnoller
versions: + Python 3.1, Python 2.7, Python 3.2, - Python 2.5
2010-04-16 18:38:05Ian.Daviscreate