Author neologix
Recipients Brian.Cain, Ian.Davis, jnoller, neologix, terry.reedy
Date 2011-02-23.23:05:44
SpamBayes Score 3.02569e-12
Marked as misclassified No
Message-id <1298502345.35.0.0539326643712.issue8426@psf.upfronthosting.co.za>
In-reply-to
Content
Alright, it's normal behaviour, but since it doesn't seem to be documented, it can be quite surprising.
A queue works like this:
- when you call queue.put(data), the data is added to a deque, which can grow and shrink forever
- then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls).
- when you do queue.get(), you just do a read on the pipe/socket

In multiproc3.py, the items are first appended to the queue, then the sender process is waited on. But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall.
And since a join is performed before dequeing the item, you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full!
If you dequeue the item before waiting the submitter process, everything works fine:


    t0 = time.time()
    try:
        get_t0 = time.time()
        vals = q.get(timeout=3.)
        get_duration = time.time() - get_t0

        s.join()

Now, for the initial report, the problem is related:

def child(task_q, result_q):
    while True:
        print "  Getting task..."
        task = task_q.get()
        print "  Got task", task[:10]
        task = task * 100000000
        print "  Putting result", task[:10]
        result_q.put(task)
        print "  Done putting result", task[:10]
        task_q.task_done()



    tasks = ["foo", "bar", "ABC", "baz"]
    for task in tasks:
        print "Putting task", task[:10], "..."
        task_q.put(task)
        print "Done putting task", task[:10]
    task_q.join()
    for task in tasks:
        print "Getting result..."
        print "Got result", result_q.get()[:10]

When the child puts results, since they're bigger tha 64k, the underlying pipe/socket fills up. Thus, the sending thread blocks on the write, and doesn't dequeue the result_q, which keeps growing.
So you end up storing in the result_q every object before starting to dequeue them, which represents roughly 4 * 3 * 1e8 = 1.2GB, which could explain the out-of-memory errors (and if it's Unicode string it's even much more)...
So the moral is: don't put() to much data to a queue without dequeuing them in a concurrent process...
History
Date User Action Args
2011-02-23 23:05:45neologixsetrecipients: + neologix, terry.reedy, jnoller, Ian.Davis, Brian.Cain
2011-02-23 23:05:45neologixsetmessageid: <1298502345.35.0.0539326643712.issue8426@psf.upfronthosting.co.za>
2011-02-23 23:05:44neologixlinkissue8426 messages
2011-02-23 23:05:44neologixcreate