Title: Shutting down consumer on a remote queue
Type: enhancement Stage: needs patch
Components: Versions: Python 3.6
Status: open Resolution:
Dependencies: Superseder:
Assigned To: davin Nosy List: Semi, aptrishu, davin, rhettinger
Priority: normal Keywords:

Created on 2017-02-05 16:18 by Semi, last changed 2017-03-04 15:45 by davin.

Messages (5)
msg287055 - (view) Author: Tom (Semi) Date: 2017-02-05 16:18
Using code adapted from the example in the docs (, if you create a remote queue server, a producer which puts items in the queue and a consumer which consumes elements from the queue. If the consumer gets killed and restarted again, it misses one item from the queue. For a reproducable example see the stackoverflow reference below.

Expected: items stay in the queue until a consumer consumes it
Happens: one item still gets consumed from the queue even if after a consumer gets killed

Version: Python 3.6.0
OS: Ubuntu 16.10

msg288942 - (view) Author: Rishav Kumar (aptrishu) Date: 2017-03-03 22:40
I'd like to work on this issue.
msg288973 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2017-03-04 07:49
> Expected: items stay in the queue until a consumer consumes it

That would be my expectation as well.

Davin, do you know why the example in the docs uses queue.Queue() instead of multiprocessing.Queue()?  Would there be a difference?

Also, I'm curious about the best practice for message queues when a consumer is killed.  Even if the message getting is atomic and never loses a message, what do people normally do to resurrect a task that was already underway when the consumer is killed?  I presume there is no easy way to find-out whether the task had just started, was in-process and changed the state of the system, or mostly finished.  Is there some sort of coding pattern for begin_transaction, commit, and rollback?  ISTM, that killing consumers is a perilous business.

The only reliable pattern I can think of is for the consumer to send back messages through another queue to indicate that a task was received and underway, and to indicate that a task was completed.
msg288980 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2017-03-04 15:32
My understanding is that example uses a queue.Queue() to demonstrate how to create a custom, remote service from scratch.  The implementation in this simple example lacks the sophistication of multiprocessing.Queue() for handling situations such as the one raised by the OP.  The example was not attempting to demonstrate a comprehensive replacement for multiprocessing.Queue(), rather it was attempting to demonstrate the mechanism for creating and consuming a callable service hosted by a remote manager.  The documentation currently does not introduce this example well nor describe the above motivation.

As to why this simplistic implementation of a distributed queue appears to lose an item when the client is killed, it works in the following way:
1.  Let's say a server is started to hold a queue.Queue() which is populated with 1 item.
2.  A client requests an item from the server.
3.  The server receives the request and performs a blocking q.get() (where q is the queue.Queue() object held by the server).
4.  When the q.get() releases and returns an item, q has had one item removed leaving a queue size of 0 in our scenario, and then that item is sent from the server to the client.
5.  A client requests another item from the server.
6.  The server receives the request and performs a blocking q.get() on the queue.  Because there's nothing left to grab from the queue, the server blocks and waits for something to magically appear in the queue.  We'll have a "producer" put something into the queue in a moment but for the time being the server is stuck waiting on the q.get() and likewise the client is waiting on a response from the server.
7.  That client is killed in an unexpected, horrible death because someone accidentally hits it with a Cntrl-C.
8.  A "producer" comes along and puts a new item into the server's queue.
9.  The server's blocking q.get() call releases, q has had one item removed leaving a queue size of 0 again, and then that item is sent from the server to the client only the client is dead and the transmission fails.
10. A "producer" comes along and puts another new item into the server's queue.
11. The someone who accidentally, horribly killed the client now frantically restarts the client; the client requests an item from the server and the server responds with a new item.  However, this is the item introduced in step 10 and not the item from step 8.  Hence the item from step 8 appears lost.

Note that in our simplistic example from the docs, there is no functionality to repopulate the queue object when communication of the item fails to complete.  In general, a multiprocessing.manager has no idea what a manager will contain and has no insight on what to do when a connection to a client is severed.

Augmenting the example in the docs to cover situations like this would significantly complicate the example but there are many others to consider on the way to building a comprehensive solution -- instead a person should choose multiprocessing.Queue() unless they have something particular in mind.

I think the example should be better introduced (the intro is terse) to explain its purpose and warn that it does not offer a comprehensive replacement for multiprocessing.Queue().  It does not need to go into all of the above explanation.
msg288981 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2017-03-04 15:45
My understanding of other message queueing systems is that many are motivated by speed to the point that they will permit messages to be "lost" due to specific scenarios that would be overly costly to defend against.  Other message queueing systems adopt a philosophy that no message should ever be lost but as a compromise to speed do not promise that a message will be immediately recovered when caught in one of these problematic scenarios, only that it will eventually be recovered and processed fully.

It appears that the philosophy adopted or really the solution requirements lead to different best practices.
Date User Action Args
2017-03-04 15:45:10davinsetmessages: + msg288981
2017-03-04 15:36:20davinsettype: behavior -> enhancement
stage: needs patch
2017-03-04 15:32:52davinsetmessages: + msg288980
2017-03-04 07:49:00rhettingersetassignee: davin

messages: + msg288973
nosy: + rhettinger
2017-03-03 22:40:43aptrishusetnosy: + aptrishu
messages: + msg288942
2017-02-06 02:36:14xiang.zhangsetnosy: + davin
type: behavior
2017-02-05 16:18:38Semicreate