Issue8426
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2010-04-16 18:38 by Ian.Davis, last changed 2022-04-11 14:57 by admin. This issue is now closed.
Files | ||||
---|---|---|---|---|
File name | Uploaded | Description | Edit | |
multiproc.py | Brian.Cain, 2010-12-09 00:01 | |||
multiproc3.py | Brian.Cain, 2010-12-09 06:16 | py3k version of test case | ||
gdb_stack_trace.txt | Brian.Cain, 2010-12-09 06:18 | stack trace of failure |
Messages (17) | |||
---|---|---|---|
msg103349 - (view) | Author: Ian Davis (Ian.Davis) | Date: 2010-04-16 18:38 | |
I'm trying to parallelize some scientific computing jobs using multiprocessing.Pool. I've also tried rolling my own Pool equivalent using Queues. In trying to return very large result objects from Pool.map()/imap() or via Queue.put(), I've noticed that multiprocessing seems to hang on the receiving end. On Cygwin 1.7.1/Python 2.5.2 it hangs with no CPU activity. On Centos 5.2/Python 2.6.2 it hangs with 100% CPU. cPickle is perfectly capable of pickling these objects, although they may be 100's of MB, so I think it's the communication. There's also some asymmetry in the error whether it's the parent or child putting the large object. The put does appear to succeed; it's the get() on the other end that hangs forever. Example code: ----- from multiprocessing import * def child(task_q, result_q): while True: print " Getting task..." task = task_q.get() print " Got task", task[:10] task = task * 100000000 print " Putting result", task[:10] result_q.put(task) print " Done putting result", task[:10] task_q.task_done() def parent(): task_q = JoinableQueue() result_q = JoinableQueue() worker = Process(target=child, args=(task_q,result_q)) worker.daemon = True worker.start() #tasks = ["foo", "bar", "ABC" * 100000000, "baz"] tasks = ["foo", "bar", "ABC", "baz"] for task in tasks: print "Putting task", task[:10], "..." task_q.put(task) print "Done putting task", task[:10] task_q.join() for task in tasks: print "Getting result..." print "Got result", result_q.get()[:10] if __name__ == '__main__': parent() ----- If run as is, I get Traceback (most recent call last): File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 242, in _feed send(obj) MemoryError: out of memory (*** hangs, I hit ^C ***) Got result Traceback (most recent call last): Process Process-1: Traceback (most recent call last): File "cygwin_multiprocessing_queue.py", line 32, in <module> File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 237, in _bootstrap parent() File "cygwin_multiprocessing_queue.py", line 29, in parent print "Got result", result_q.get()[:10] self.run() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 93, in run File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get self._target(*self._args, **self._kwargs) File "cygwin_multiprocessing_queue.py", line 6, in child res = self._recv() KeyboardInterrupt task = task_q.get() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get res = self._recv() KeyboardInterrupt If instead I comment out the multiplication in child() and uncomment the large task in parent(), then I get Getting task... Putting task foo ... Done putting task foo Putting task bar ... Got task foo Putting result foo Done putting task bar Putting task ABCABCABCA ... Done putting task ABCABCABCA Putting task baz ... Done putting result foo Getting task... Got task bar Putting result bar Done putting result bar Getting task... Done putting task baz (*** hangs, I hit ^C ***) Traceback (most recent call last): File "cygwin_multiprocessing_queue.py", line 32, in <module> parent() File "cygwin_multiprocessing_queue.py", line 26, in parent task_q.join() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 303, in join self._cond.wait() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/synchronize.py", line 212, in wait self._wait_semaphore.acquire(True, timeout) KeyboardInterrupt |
|||
msg113031 - (view) | Author: Terry J. Reedy (terry.reedy) * | Date: 2010-08-05 19:46 | |
By 'crash', do you actually mean 'hang'? Jesse, is it reasonable to stuff pickles of 100s of megabytes through the connections? |
|||
msg113033 - (view) | Author: Jesse Noller (jnoller) * | Date: 2010-08-05 19:51 | |
I don't know that it's unreasonable to send that much data, but it would certainly be slow, and I would not recommend it. Therefore, this is still on the list for when I have time |
|||
msg123663 - (view) | Author: Brian Cain (Brian.Cain) * | Date: 2010-12-09 00:01 | |
I don't think the problem is limited to when hundreds of megabytes are being transmitted. I believe I am experiencing a problem with the same root cause whose symptoms are slightly different. It seems like there's a threshhold which causes not merely poor performance, but likely an unrecoverable fault. Here's the output when I run my example on SLES11.1: $ ./multiproc.py $((8*1024)) 2 on 2.6 (r26:66714, May 5 2010, 14:02:39) [GCC 4.3.4 [gcc-4_3-branch revision 152973]] - Linux linux 2.6.32.12-0.7-default #1 SMP 2010-05-20 11:14:20 +0200 x86_64 x86_64 0 entries in flight, join() took 5949.97 usec, get() did 0.000000 items/sec 2 entries in flight, join() took 1577.85 usec, get() did 42581.766497 items/sec 4 entries in flight, join() took 1966.00 usec, get() did 65536.000000 items/sec 6 entries in flight, join() took 1894.00 usec, get() did 105296.334728 items/sec 8 entries in flight, join() took 1420.02 usec, get() did 199728.761905 items/sec 10 entries in flight, join() took 1950.03 usec, get() did 163840.000000 items/sec 12 entries in flight, join() took 1241.92 usec, get() did 324720.309677 items/sec ... 7272 entries in flight, join() took 2516.03 usec, get() did 10983427.687432 items/sec 7274 entries in flight, join() took 1813.17 usec, get() did 10480717.037444 items/sec 7276 entries in flight, join() took 1979.11 usec, get() did 11421315.832335 items/sec 7278 entries in flight, join() took 2043.01 usec, get() did 11549808.744608 items/sec ^C7280 entries: join() ABORTED by user after 83.08 sec ... I see similar results when I run this test with a larger step, I just wanted to get finer resolution on the failure point. |
|||
msg123666 - (view) | Author: Terry J. Reedy (terry.reedy) * | Date: 2010-12-09 02:28 | |
2.6.6 was the last bugfix release |
|||
msg123671 - (view) | Author: Brian Cain (Brian.Cain) * | Date: 2010-12-09 06:16 | |
I was able to reproduce the problem on a more recent release. 7279 entries fails, 7278 entries succeeds. $ ./multiproc3.py on 3.1.2 (r312:79147, Apr 15 2010, 12:35:07) [GCC 4.4.3] - Linux mini 2.6.32-26-generic #47-Ubuntu SMP Wed Nov 17 15:59:05 UTC 2010 i686 7278 entries in flight, join() took 12517.93 usec, get() did 413756.736588 items/sec 7278 entries in flight, join() took 19458.06 usec, get() did 345568.562217 items/sec 7278 entries in flight, join() took 21326.07 usec, get() did 382006.563784 items/sec 7278 entries in flight, join() took 14937.16 usec, get() did 404244.835554 items/sec 7278 entries in flight, join() took 18877.98 usec, get() did 354435.878968 items/sec 7278 entries in flight, join() took 20811.08 usec, get() did 408343.738456 items/sec 7278 entries in flight, join() took 14394.04 usec, get() did 423727.055218 items/sec 7278 entries in flight, join() took 18940.21 usec, get() did 361012.624762 items/sec 7278 entries in flight, join() took 19073.96 usec, get() did 367559.024118 items/sec 7278 entries in flight, join() took 16229.87 usec, get() did 424764.763755 items/sec 7278 entries in flight, join() took 18527.03 usec, get() did 355546.367937 items/sec 7278 entries in flight, join() took 21500.11 usec, get() did 390429.802164 items/sec 7278 entries in flight, join() took 13646.84 usec, get() did 410468.669903 items/sec 7278 entries in flight, join() took 18921.14 usec, get() did 355873.819767 items/sec 7278 entries in flight, join() took 13582.94 usec, get() did 287553.877353 items/sec 7278 entries in flight, join() took 21958.11 usec, get() did 405549.873285 items/sec ^C7279 entries: join() ABORTED by user after 5.54 sec ^CError in atexit._run_exitfuncs: Segmentation fault |
|||
msg123672 - (view) | Author: Brian Cain (Brian.Cain) * | Date: 2010-12-09 06:18 | |
Detailed stack trace when the failure occurs (gdb_stack_trace.txt) |
|||
msg129229 - (view) | Author: Charles-François Natali (neologix) * | Date: 2011-02-23 23:05 | |
Alright, it's normal behaviour, but since it doesn't seem to be documented, it can be quite surprising. A queue works like this: - when you call queue.put(data), the data is added to a deque, which can grow and shrink forever - then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls). - when you do queue.get(), you just do a read on the pipe/socket In multiproc3.py, the items are first appended to the queue, then the sender process is waited on. But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall. And since a join is performed before dequeing the item, you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine: t0 = time.time() try: get_t0 = time.time() vals = q.get(timeout=3.) get_duration = time.time() - get_t0 s.join() Now, for the initial report, the problem is related: def child(task_q, result_q): while True: print " Getting task..." task = task_q.get() print " Got task", task[:10] task = task * 100000000 print " Putting result", task[:10] result_q.put(task) print " Done putting result", task[:10] task_q.task_done() tasks = ["foo", "bar", "ABC", "baz"] for task in tasks: print "Putting task", task[:10], "..." task_q.put(task) print "Done putting task", task[:10] task_q.join() for task in tasks: print "Getting result..." print "Got result", result_q.get()[:10] When the child puts results, since they're bigger tha 64k, the underlying pipe/socket fills up. Thus, the sending thread blocks on the write, and doesn't dequeue the result_q, which keeps growing. So you end up storing in the result_q every object before starting to dequeue them, which represents roughly 4 * 3 * 1e8 = 1.2GB, which could explain the out-of-memory errors (and if it's Unicode string it's even much more)... So the moral is: don't put() to much data to a queue without dequeuing them in a concurrent process... |
|||
msg133654 - (view) | Author: Charles-François Natali (neologix) * | Date: 2011-04-13 08:51 | |
It's documented in http://docs.python.org/library/multiprocessing.html#multiprocessing-programming : """ Joining processes that use queues Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.) This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined. """ Suggesting to close. |
|||
msg134009 - (view) | Author: Brian Cain (Brian.Cain) * | Date: 2011-04-19 02:54 | |
Please don't close the issue. Joining aside, the basic point ("But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall.") is not clear from the docs, right? IMO, it would be nice if I could ask my queue, "Just what is your capacity (in bytes, not entries) anyways? I want to know how much I can put in here without worrying about whether the remote side is dequeueing." I guess I'd settle for explicit documentation that the bound exists. But how should I expect my code to be portable? Are there platforms which provide less than 64k? Less than 1k? Less than 256 bytes? |
|||
msg134010 - (view) | Author: Terry J. Reedy (terry.reedy) * | Date: 2011-04-19 03:38 | |
Please do not send html responses, as they result in a spurious 'unnamed' file being attached. Please do suggest a specific change. Should this be changed to a doc issue? |
|||
msg134082 - (view) | Author: Charles-François Natali (neologix) * | Date: 2011-04-19 17:17 | |
> IMO, it would be nice if I could ask my queue, "Just what is your capacity (in bytes, not entries) anyways? I want to know how much I can put in here without worrying about whether the remote side is dequeueing." I guess I'd settle for explicit documentation that the bound exists. It is documented. See the comment about the "underlying pipe". > But how should I expect my code to be portable? Are there platforms which provide less than 64k? Less than 1k? Less than 256 bytes? It depends :-) If the implementation is using pipes, under Linux before 2.6.9 (I think), a pipe was limited by the size of a page, i.e. 4K on x86. Now, it's 64K. If it's a Unix socket (via socketpair), the maximum size can be set through sysctl, etc. So you can't basically state a limit, and IMHO, you should't be concerned with that if you want your code to be portable. I find the warning excplicit enough, be that's maybe because I'm familiar with this low-level details. |
|||
msg134369 - (view) | Author: Matt Goodman (Matt.Goodman) | Date: 2011-04-25 02:30 | |
You can not pickle individual objects larger than 2**31. This failure is not handled cleanly in the core module, and I suspect masked by above processes. Try piping "a"*(2**31) through you pipe, or pickling it to disk . . . |
|||
msg135232 - (view) | Author: Charles-François Natali (neologix) * | Date: 2011-05-05 19:08 | |
> You can not pickle individual objects larger than 2**31. Indeed, but that's not what's happening here, the failure occurs with much smaller objects (also, note the OP's "cPickle is perfectly capable of pickling these objects"). The problem is really with the underlying pipe/unix socket filling up. |
|||
msg143075 - (view) | Author: Vinay Sajip (vinay.sajip) * | Date: 2011-08-27 15:13 | |
I think it's just a documentation issue. The problem with documenting limits is that they are system-specific and, even if the current limits that Charles-François has mentioned are documented, these could become outdated. Perhaps a suggestion could be added to the documentation: "Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing." |
|||
msg143081 - (view) | Author: Charles-François Natali (neologix) * | Date: 2011-08-27 15:46 | |
> "Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing." There's a misunderstanding here: there is absolutely no limit on the size of objects that can be put through a queue (apart from the host's memory and the 32-bit limit): the problem is really that you can't just put an arbitrary buch of data to a queue, and then join it before making sure other processes will *eventually* pop all the data from the queue. I.e., you can't do: q = Queue() for i in range(1000000): q.put(<big obj>) q.join() for i in range(10000000): q.get() That's because join() will wait until the feeder thread has managed to write all the items to the underlying pipe/Unix socket, and this might hang if the underlying pipe/socket is full (which will happen after one has put around 128K without having popped any item). That's what's explained here: It's documented in http://docs.python.org/library/multiprocessing.html#multiprocessing-programming : """ Joining processes that use queues Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.) This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined. """ If find this wording really clear, but if someone comes up with a better - i.e. less technical - wording, go ahead. |
|||
msg254550 - (view) | Author: Serhiy Storchaka (serhiy.storchaka) * | Date: 2015-11-12 20:09 | |
I agree with Charles-François and think this issue should be closed. There is no a bug, and the behavior is documented. |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:57:00 | admin | set | github: 52673 |
2015-11-26 17:21:29 | serhiy.storchaka | set | status: pending -> closed resolution: not a bug stage: resolved |
2015-11-12 20:09:35 | serhiy.storchaka | set | status: open -> pending nosy: + serhiy.storchaka messages: + msg254550 |
2011-09-16 07:59:19 | neologix | link | issue8237 superseder |
2011-08-28 16:32:13 | neologix | set | priority: normal -> low nosy: + docs@python components: + Documentation, - Library (Lib) |
2011-08-27 15:47:00 | neologix | set | messages: + msg143081 |
2011-08-27 15:13:03 | vinay.sajip | set | nosy:
+ vinay.sajip messages: + msg143075 |
2011-05-10 01:11:02 | osvenskan | set | nosy:
+ osvenskan |
2011-05-05 19:08:29 | neologix | set | messages: + msg135232 |
2011-04-25 02:30:40 | Matt.Goodman | set | nosy:
+ Matt.Goodman messages: + msg134369 |
2011-04-19 17:17:06 | neologix | set | messages: + msg134082 |
2011-04-19 03:38:30 | terry.reedy | set | messages: + msg134010 |
2011-04-19 03:35:40 | terry.reedy | set | files: - unnamed |
2011-04-19 02:54:10 | Brian.Cain | set | files:
+ unnamed messages: + msg134009 |
2011-04-13 08:51:48 | neologix | set | messages: + msg133654 |
2011-02-23 23:05:44 | neologix | set | nosy:
+ neologix messages: + msg129229 |
2010-12-09 06:18:35 | Brian.Cain | set | files:
+ gdb_stack_trace.txt messages: + msg123672 |
2010-12-09 06:16:42 | Brian.Cain | set | files:
+ multiproc3.py messages: + msg123671 |
2010-12-09 02:28:14 | terry.reedy | set | type: crash -> behavior messages: + msg123666 versions: - Python 2.6 |
2010-12-09 00:01:39 | Brian.Cain | set | files:
+ multiproc.py versions: + Python 2.6 nosy: + Brian.Cain messages: + msg123663 |
2010-08-05 19:51:22 | jnoller | set | messages: + msg113033 |
2010-08-05 19:46:49 | terry.reedy | set | nosy:
+ terry.reedy messages: + msg113031 versions: - Python 2.6 |
2010-04-18 18:43:12 | pitrou | set | priority: normal assignee: jnoller nosy: + jnoller versions: + Python 3.1, Python 2.7, Python 3.2, - Python 2.5 |
2010-04-16 18:38:05 | Ian.Davis | create |