classification
Title: multiprocessing.Queue.get() raises queue.Empty exception if even if an item is available
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 2.7
process
Status: closed Resolution: not a bug
Dependencies: Superseder: multiprocessing.Queue.get is not getting all the items in the queue
View: 23582
Assigned To: sbt Nosy List: Alexei.Mozhaev, davin, sbt, torsten
Priority: normal Keywords: patch

Created on 2014-01-06 15:30 by torsten, last changed 2015-03-05 17:53 by davin. This issue is now closed.

Files
File name Uploaded Description Edit
queue_timeout_0.diff torsten, 2014-01-06 15:30 Suggested patch including unit tests review
queue_timeout_1.diff torsten, 2014-01-06 15:49 Trivial update that only calls _poll with timeout > 0 review
py_mult_queue_bug.py Alexei.Mozhaev, 2014-04-22 13:57 an example of the code which shows the bug
Messages (4)
msg207443 - (view) Author: Torsten Landschoff (torsten) * Date: 2014-01-06 15:30
The behaviour of multiprocessing.Queue surprised me today in that Queue.get() may raise an exception even if an item is immediately available. I tried to flush entries without blocking by using the timeout=0 keyword argument:
$ /opt/python3/bin/python3
Python 3.4.0b1 (default:247f12fecf2b, Jan  6 2014, 14:50:23) 
[GCC 4.6.3] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put("hi")
>>> q.get(timeout=0)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/python3/lib/python3.4/multiprocessing/queues.py", line 107, in get
    raise Empty
queue.Empty

Actually even passing a small non-zero timeout will not give me my queue entry:
>>> q.get(timeout=1e-6)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/torsten/opensrc/cpython/Lib/multiprocessing/queues.py", line 107, in get
    raise Empty
queue.Empty

Expected behaviour for me would be to return the item that is in the queue. I know that there is a kwarg *block* which gives me the desired behaviour:
>>> q.get(block=False)
'hi'
In my case the get call is embedded in my own module which does not currently expose the block parameter. My local solution is of course to update the wrapper:

if timeout == 0:
    timeout = None
    block = False

However I see a few smells here in the python standard library. First, everything else seems to accept timeout=0 as nonblocking:

>>> import threading
>>> lock = threading.Lock()
>>> lock.acquire(timeout=0)
True
>>> from queue import Queue
>>> q = Queue()
>>> q.put("hi")
>>> q.get(timeout=0)
'hi'
Of special note is that queue.Queue behaves as I would have expected. IMHO it should be consistent with multiprocessing.Queue.

Also note that queue.Queue.get() and queue.Queue.put() name their blocking flag "block", while everybody else uses "blocking".

As a side note, I think the current approach is flawed in computing the deadline. Basically it does the following:

    deadline = time.time() + timeout
    if not self._rlock.acquire(block, timeout):
        raise Empty
    timeout = deadline - time.time()
    if timeout < 0 or not self._poll(timeout):
        raise Empty

On my system, just taking the time twice and computing the delta takes 2 microseconds:

>>> import time
>>> t0 = time.time(); time.time() - t0
2.384185791015625e-06

Therefore calling Queue.get(block, timeout) with 0 < timeout < 2e-6 will never return anything from the queue even though Queue.get(block=False) would do that. This contradicts the idea that Queue.get(block=False) will return faster than with block=True with any timeout > 0.

Apart from that, as Python does not currently support waiting on multiple sources, we currently often check a queue with a small timeout concurrently with doing other stuff. In case the system get really loaded, I would expect this to cause problems because the updated timeout may fall below zero.

Suggested patch attached.
msg217004 - (view) Author: Alexei Mozhaev (Alexei.Mozhaev) Date: 2014-04-22 13:57
We have a similar bug with Queue.get().

Queue.get(False) raises an exception Queue.Empty in the case when the queue is actually not empty!

An example of the code is attached and is listed below just in case:

----------------------
import multiprocessing
import Queue

class TestWorker(multiprocessing.Process):

    def __init__(self, inQueue):
        multiprocessing.Process.__init__(self)
        self.inQueue = inQueue

    def run(self):
        while True:
            try:
                task = self.inQueue.get(False)
            except Queue.Empty:
                # I suppose that Queue.Empty exception is about empty queue 
                # and self.inQueue.empty() must be true in this case
                # try to check it using assert
                assert self.inQueue.empty()
                break


def runTest():
    queue = multiprocessing.Queue()
    for _ in xrange(10**5):
        queue.put(1)
    workers = [TestWorker(queue) for _ in xrange(4)]
    map(lambda w: w.start(), workers)
    map(lambda w: w.join(), workers)


if __name__ == "__main__":
    runTest()
----------------------
msg219556 - (view) Author: Alexei Mozhaev (Alexei.Mozhaev) Date: 2014-06-02 08:28
Hi! Are there any updates on the issue?
msg237279 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2015-03-05 17:49
This same issue came up recently in issue23582.  Really, it should have been addressed in this issue here first and issue23582 marked as a duplicate of this one but these things don't always happen in a synchronous or apparently-linear fashion.

Adding to what is captured in issue23582, specifically referring to the points raised here in this issue:
1. A call to put does not mean that the data put on the queue is instantly/atomically available for retrieval via get.  Situations where a call to put is immediately followed by a non-blocking call to get are asking for a race-condition -- this is a principal reason for having blocking calls with timeouts.
2. A call to get resulting in an Empty exception of course does not mean that the queue is forevermore empty, only that the queue is empty at the moment the call to get was made -- the facility for trapping the Empty and trying again to get more data off the queue provides welcome flexibility on top of the use of blocking/non-blocking calls with/without timeouts.
3. A call to empty is, as indicated in the documentation, not to be considered reliable because of the semantics in coordinating the queue's state and data between processes/threads.
4. Alexei's contributions to this issue are very nearly identical to what is discussed in issue23582 and are addressed well there.
5. As to using a timeout value too small to be effective (i.e. < 2e-6), really this is one example of the larger concern of choosing an appropriate timeout value.  In the proposed patch, ensuring that a call to self._poll is made no matter what might potentially buy additional time for the data to be synced and made available (admittedly a happy result, but a fragile, inadvertent win) but it does not address the rest of how get, put, and the others work nor will it necessarily solve the issue being raised here.

In Alexei's example, changing the call to get from a non-blocking call to a blocking call with a reasonably small timeout will reliably ensure that everything put on the queue can and will be gotten back by the rest of that code.

In multiprocessing, we have queues to help us make data available to and across processes and threads alike -- we must recognize that coordinating data across distinct processes (especially) takes a non-zero amount of time -- hence we have the tools of blocking as well as non-blocking calls both with or without timeouts to properly implement robust code in these situations.
History
Date User Action Args
2015-03-05 17:53:26davinsetstatus: open -> closed
2015-03-05 17:49:23davinsetnosy: + davin
messages: + msg237279
resolution: not a bug

superseder: multiprocessing.Queue.get is not getting all the items in the queue
stage: resolved
2014-06-08 20:13:07sbtsetassignee: sbt
2014-06-02 08:28:39Alexei.Mozhaevsetmessages: + msg219556
2014-04-22 17:47:58ned.deilysetnosy: + sbt
2014-04-22 13:57:12Alexei.Mozhaevsetfiles: + py_mult_queue_bug.py
versions: - Python 3.1, Python 3.2, Python 3.3, Python 3.4, Python 3.5
nosy: + Alexei.Mozhaev

messages: + msg217004
2014-01-06 15:49:54torstensetfiles: + queue_timeout_1.diff
2014-01-06 15:30:10torstencreate