classification
Title: multiprocesing.Queue silently ignore messages after exc in _feeder
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 3.7, Python 3.6, Python 3.5, Python 2.7
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: Nosy List: davin, grzgrzgrz3, pitrou, tomMoral, vstinner, xiang.zhang
Priority: normal Keywords:

Created on 2017-05-20 18:26 by grzgrzgrz3, last changed 2017-06-09 12:29 by vstinner. This issue is now closed.

Pull Requests
URL Status Linked Edit
PR 1683 merged grzgrzgrz3, 2017-05-20 18:34
PR 1815 merged pitrou, 2017-05-25 14:34
PR 1816 merged pitrou, 2017-05-25 14:35
PR 1817 merged pitrou, 2017-05-25 14:51
PR 1013 tomMoral, 2017-06-02 15:04
Messages (12)
msg294044 - (view) Author: Grzegorz Grzywacz (grzgrzgrz3) * Date: 2017-05-20 18:26
multiprocessing.Queue is running background thread feeder. Feeder serialize and sends buffered data to pipe. 

The issue is with exception handling, feeder is catching all exceptions but out of main loop, so after exception is handled feeder is not going back to loop - thread finish. If feeder thread is not running any Queue.put will execute without exceptions but message not gonna be delivered.

Solution is to move exception handling inside main loop. I will provide PR.

I have run performance tests (found: #17025) and submitted patch do not affect performance.
msg294399 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-24 21:35
Can you expand on which exceptions you are getting in the feeder thread?
msg294401 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-24 21:38
Nevermind, I saw the PR and the test case.
msg294481 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-25 14:23
New changeset bc50f03db4f58c869b78e98468e374d7e61f1227 by Antoine Pitrou (grzgrzgrz3) in branch 'master':
bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (#1683)
https://github.com/python/cpython/commit/bc50f03db4f58c869b78e98468e374d7e61f1227
msg294485 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-25 14:57
New changeset 2783cc42629b9445ea848ce36bbf213ef7789271 by Antoine Pitrou in branch '3.6':
[3.6] bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (GH-1683) (#1815)
https://github.com/python/cpython/commit/2783cc42629b9445ea848ce36bbf213ef7789271
msg294486 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-25 15:19
New changeset 89004d761361332314beb08b443bff5b092ec36e by Antoine Pitrou in branch '3.5':
[3.5] bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (GH-1683) (#1816)
https://github.com/python/cpython/commit/89004d761361332314beb08b443bff5b092ec36e
msg294492 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-25 15:53
New changeset bdd964710deffe8593063dcb63157e5b55a82c61 by Antoine Pitrou in branch '2.7':
[2.7] bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (GH-1683) (#1817)
https://github.com/python/cpython/commit/bdd964710deffe8593063dcb63157e5b55a82c61
msg294493 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-05-25 15:54
This is committed and pushed, thank you!
msg295022 - (view) Author: Thomas Moreau (tomMoral) * Date: 2017-06-02 15:01
This fix, while preventing the Queue to crash, does not give any way to  programatically detect that the message was dropped. This is a problem as we can no longer assume that the Queue will not drop messages. For instance, we can no longer detect deadlocks in concurrent.futures.ProcessPoolExecutor as done in https://github.com/python/cpython/pull/1013 where the crashed QueueFeederThread was used to monitor the working state of the executor.

We could either:
- Put a flag highlighting the fact that some messages where dropped.
- Add an argument to the Queue to close on pickling errors.

I'd be happy to work on a PR to implement any solution that you think is reasonable.
msg295040 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-06-02 16:52
Thomas, thanks for the heads up.  I would suggest something like the following patch to multiprocessing.Pool:

$ git diff
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 7f77837..ebbb360 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -260,8 +260,16 @@ class Queue(object):
                     info('error in queue thread: %s', e)
                     return
                 else:
-                    import traceback
-                    traceback.print_exc()
+                    self._on_queue_thread_error(e)
+
+    def _on_queue_thread_error(self, e):
+        """
+        Private API called when feeding data in the background thread
+        raises an exception.  For overriding by concurrent.futures.
+        """
+        import traceback
+        traceback.print_exc()
+
 
 _sentinel = object()
 


Then you can write your own Queue subclass in concurrent.futures to handle that error and clean up/restart whatever needs to be cleaned up or restarted.  What do you think?
msg295116 - (view) Author: Thomas Moreau (tomMoral) * Date: 2017-06-04 08:59
I think this is a good solution as it let the user define easily the behavior it needs in other situation too. I would recommend adding the object responsible for the failure to the _on_queue_thread_error callback. This would simplify the error handling.


@@ -260,8 +260,16 @@ class Queue(object):
                     info('error in queue thread: %s', e)
                     return
                 else:
-                    import traceback
-                    traceback.print_exc()
+                    self._on_queue_thread_error(e, obj)
+
+    def _on_queue_thread_error(self, e, obj):
+        """
+        Private API called when feeding data in the background thread
+        raises an exception.  For overriding by concurrent.futures.
+        """
+        import traceback
+        traceback.print_exc()
+
msg295521 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2017-06-09 12:29
We started to get *random* failures of test_queue_feeder_donot_stop_onexc() of test_multiprocessing_spawn since a few days. I may be related to this change. Can you please take a look at bpo-30595?
History
Date User Action Args
2017-06-09 12:29:15vstinnersetnosy: + vstinner
messages: + msg295521
2017-06-04 08:59:34tomMoralsetmessages: + msg295116
2017-06-02 16:52:01pitrousetmessages: + msg295040
2017-06-02 15:04:06tomMoralsetpull_requests: + pull_request2001
2017-06-02 15:01:47tomMoralsetnosy: + tomMoral
messages: + msg295022
2017-05-25 15:54:28pitrousetstatus: open -> closed
resolution: fixed
messages: + msg294493

stage: patch review -> resolved
2017-05-25 15:53:07pitrousetmessages: + msg294492
2017-05-25 15:19:13pitrousetmessages: + msg294486
2017-05-25 14:57:49pitrousetmessages: + msg294485
2017-05-25 14:51:34pitrousetpull_requests: + pull_request1902
2017-05-25 14:35:12pitrousetpull_requests: + pull_request1901
2017-05-25 14:34:39pitrousetpull_requests: + pull_request1900
2017-05-25 14:23:00pitrousetmessages: + msg294481
2017-05-24 21:38:18pitrousetmessages: + msg294401
2017-05-24 21:35:56pitrousettype: behavior
stage: patch review
2017-05-24 21:35:45pitrousetnosy: + pitrou
messages: + msg294399
2017-05-21 15:37:07xiang.zhangsetnosy: + davin, xiang.zhang
2017-05-20 18:34:27grzgrzgrz3setpull_requests: + pull_request1778
2017-05-20 18:26:50grzgrzgrz3create