New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multiprocesing.Queue silently ignore messages after exc in _feeder #74599
Comments
multiprocessing.Queue is running background thread feeder. Feeder serialize and sends buffered data to pipe. The issue is with exception handling, feeder is catching all exceptions but out of main loop, so after exception is handled feeder is not going back to loop - thread finish. If feeder thread is not running any Queue.put will execute without exceptions but message not gonna be delivered. Solution is to move exception handling inside main loop. I will provide PR. I have run performance tests (found: bpo-17025) and submitted patch do not affect performance. |
Can you expand on which exceptions you are getting in the feeder thread? |
Nevermind, I saw the PR and the test case. |
This is committed and pushed, thank you! |
This fix, while preventing the Queue to crash, does not give any way to programatically detect that the message was dropped. This is a problem as we can no longer assume that the Queue will not drop messages. For instance, we can no longer detect deadlocks in concurrent.futures.ProcessPoolExecutor as done in #1013 where the crashed QueueFeederThread was used to monitor the working state of the executor. We could either:
I'd be happy to work on a PR to implement any solution that you think is reasonable. |
Thomas, thanks for the heads up. I would suggest something like the following patch to multiprocessing.Pool: $ git diff
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 7f77837..ebbb360 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -260,8 +260,16 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
- import traceback
- traceback.print_exc()
+ self._on_queue_thread_error(e)
+
+ def _on_queue_thread_error(self, e):
+ """
+ Private API called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+
_sentinel = object()
Then you can write your own Queue subclass in concurrent.futures to handle that error and clean up/restart whatever needs to be cleaned up or restarted. What do you think? |
I think this is a good solution as it let the user define easily the behavior it needs in other situation too. I would recommend adding the object responsible for the failure to the _on_queue_thread_error callback. This would simplify the error handling. @@ -260,8 +260,16 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
- import traceback
- traceback.print_exc()
+ self._on_queue_thread_error(e, obj)
+
+ def _on_queue_thread_error(self, e, obj):
+ """
+ Private API called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+ |
We started to get *random* failures of test_queue_feeder_donot_stop_onexc() of test_multiprocessing_spawn since a few days. I may be related to this change. Can you please take a look at bpo-30595? |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: