New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parent process hanging in multiprocessing if children terminate unexpectedly #53451
Comments
I have recently begun using multiprocessing for a variety of batch Attached is a patch to handle unexpected terminations of children |
thanks greg; I'm going to take a look and think about this. I'd like to resolve bug 9207 first though |
Cool, thanks. I'll note that with this patch applied, using the test program from 9207 I consistently get the following exception:
"""
Exception in thread Thread-1 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/threading.py", line 484, in run
File "/home/gdb/repositories/multiprocessing/pool.py", line 312, in _handle_workers
File "/home/gdb/repositories/multiprocessing/pool.py", line 190, in _maintain_pool
File "/home/gdb/repositories/multiprocessing/pool.py", line 158, in _join_exited_workers
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
""" This is line 148 in the unpatched source, namely the 'reversed(range(len(self._pool)))' line of _join_exited_workers. Looks like the same issue, where instead reversed/range/len have been set to None. So I think by changing how much time the worker_handler spends in various functions, I've made it possible (or just more likely?) that if we lose the race with interpreter shutdown the worker_handler will be in the middle of _join_exited_workers. This may mean that someone should keep around a local reference to reversed/range/len... not sure if there's a better solution. |
Ugh. I'm going to have to think about the cleanest way of handling this case of functions vanishing from us since this is going to be more widespread inside the code. Suggestions welcome. |
What about just catching the exception? See e.g. the attached patch. (Disclaimer: not heavily tested). |
A+ for creativity; I wouldn't have thought of that ;) |
termination.patch, in the result handler you've added: while cache and thread._state != TERMINATE and not failed why are you terminating the second pass after finding a failed process? Unpickleable errors and other errors occurring in the worker body are not exceptional cases, at least not now that the pool is supervised by _handle_workers. I think the result should be set also in this case, so the user can inspect the exception after the fact. I have some other suggestions too, so I will review this patch tomorrow. For shutdown.patch, I thought this only happened in the worker handler, but you've enabled this for the result handler too? I don't care about the worker handler, but with the result handler I'm worried that I don't know what ignoring these exceptions actually means. For example, is there a possibility that we may lose results at shutdown? |
Thanks much for taking a look at this!
More generally, my chief assumption that went into this is that the unexpected death of a worker process is unrecoverable. It would be nice to have a better workaround than just aborting everything, but I couldn't see a way to do that.
|
Greg - I asked Ask to take a look - his celery package is a huge consumer of multiprocessing, and so I tend to run things past him as well. That said - to both of you - the fundamental problem the shutdown patch is trying to scratch is located in bpo-9207 - greg's termination patch just exposes the problem in 9207 a lot more. Focusing specifically on the shutdown patch; our issue is that during interpreter shutdown, sys.modules is iterated, and entries are set to None - for threads which "live on" well into that cycle can end up losing imported functions/modules/etc. The multiple daemon threads in the Pool code are exposing this as code which executed imported functions (such as the debug() statement in handle_workers) which will fire after the pool has exited and the interpreter is shut down. We can work around the shutdown issue (really, bug 9207) by ignoring the exception such as shutdown.patch does, or passing in references/adding references to the functions those methods need. Or (as Brett suggested) converting them to class methods and adding references to the class. Or passing them in via the signature like this _handle_workers(arg, _debug=debug), etc. |
It would be a problem if the process simply disappeared, For processes disappearing (if that can at all happen), we could solve
It's lost now, but not if we handle the error... I guess that the worker handler works as a supervisor is a side effect,
I was already working on this issue last week actually, and I managed |
Anyway, so I guess the question that is forming in my mind is, what sorts of errors do we want to handle, and how do we want to handle them? My answer is I'd like to handle all possible errors with some behavior that is not "hang forever". This includes handling children processes dying by signals or os._exit, raising unpickling errors, etc. I believe my patch provides this functionality. By adding the extra mechanism that you've written/proposed, we can improve the error handling in specific recoverable cases (which probably constitute the vast majority of real-world cases). |
I think I misunderstood the purpose of the patch. This is about handling errors on get(), not on put() like I was working on. So sorry for that confusion. What kind of errors are you having that makes the get() call fail? If the queue is not working, then I guess the only sensible approach is to shutdown the pool like suggested. I'll open up another issue for unpickleable errors then. |
For reference I opened up a new issue for the put() case here: http://bugs.python.org/issue9244 |
I'll note that the particular issues that I've run into in practice are:
This AttributeError problem is one that I discovered while generating test cases for the patch. |
While looking at your patch in bpo-9244, I realized that my code fails to handle an unpickleable task, as in: |
There's one more thing if exitcode is not None:
cleaned = True
if exitcode != 0 and not worker._termination_requested:
abnormal.append((worker.pid, exitcode)) Instead of restarting crashed worker processes it will simply bring down If so, then I think it's important to decide whether we want to keep Some alternatives are: A) Any missing worker brings down the pool. B) Missing workers will be replaced one-by-one. A maximum-restart-frequency decides when the supervisor should give up trying to recover C) Same as B, except that any process crashing when trying to get() will bring down the pool. I think the supervisor is a good addition, so I would very much like to keep it. It's also a step closer to my goal of adding the enhancements added by Celery to multiprocessing.pool. Using C is only a few changes away from this patch, but B would also be possible in combination with my accept_callback patch. It does pose some overhead, so it depends on the level of recovery we want to support. accept_callback: this is a callback that is triggered when the job is reserved by a worker process. The acks are sent to an additional Queue, with an additional thread processing the acks (hence the mentioned overhead). This enables us to keep track of what the worker processes are doing, also get the PID of the worker processing any given job (besides from recovery, potential uses are monitoring and the ability to terminate a job (ApplyResult.terminate?). See http://github.com/ask/celery/blob/master/celery/concurrency/processes/pool.py |
Jesse wrote,
Greg wrote,
I don't think _make_shutdown_safe should be added to the result handler. Jesse, how hard is it to fix the worker handler by passing the references? Note that _worker_handler is not important to complete shutdown at this point, but it may be in the future (it seems termination.patch already changes this) |
Passing the references seems to be a losing game; for _handle_workers - we only need 1 function (debug) - for others (say _join_exited_workers), we need references to reversed/range/len. A possible alternative is to make those threads non-daemon threads; but I'd have to test that. |
Before I forget, looks like we also need to deal with the result from a worker being un-unpickleable: This shouldn't require much more work, but I'll hold off on submitting a patch until we have a better idea of where we're going in this arena.
The idea of recording the mapping of tasks -> workers seems interesting. Getting all of the corner cases could be hard (e.g. making removing a task from the queue and recording which worker did the removing atomic, detecting if the worker crashed while still holding the queue lock) and doing this would require extra mechanism. This feature does seem to be useful for pools running many different jobs, because that way a crashed worker need only terminate one job. Anyway, I'd be curious to know more about the kinds of crashes you've encountered from which you'd like to be able to recover. Is it just Unpickleable exceptions, or are there others? |
Greg,
This is what my patch in bug 9244 does...
Losing a task is not fun, but there may still be other tasks user-initiated interrupts, this is very important to recover from,
I think I may have an alternative solution. Instead of keeping track of what the workers are doing, we could simply change the result handler while state != TERMINATE:
result = get(timeout=1)
if all_processes_dead():
break; |
Ok. I implemented my suggestions in the patch attached Greg, Maybe we could keep the behavior in termination.patch as an option for map jobs? It is certainly a problem that map jobs won't terminate until the pool is joined. |
I'll take a look at your patch in a bit. From our differing use-cases, I do think it could make sense as a configuration option, but where it probably belongs is on the wait() call of ApplyResult. |
Just some small cosmetic changes to the patch. |
Your example is demonstrating the pickle error on put(), not on get().
Yeah, check this out: /opt/devel/Python/trunk(master)$> patch -p1 < multiprocessing-trunk@82502-handle_worker_encoding_errors2.patch
patching file Lib/multiprocessing/pool.py
patching file Lib/test/test_multiprocessing.py
/opt/devel/Python/trunk(master)$> ./python.exe
Python 2.7 (unknown, Jul 13 2010, 13:28:35)
[GCC 4.2.1 (Apple Inc. build 5659)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import multiprocessing
>>> def foo():
... return lambda: 42
...
>>> p = multiprocessing.Pool(2)
>>> p.apply_async(foo).get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/Python/trunk/Lib/multiprocessing/pool.py", line 518, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<function <lambda> at 0x1005477d0>'. Reason: 'Can't pickle <type 'function'>: attribute lookup __builtin__.function failed'
>>> import operator
>>> p.apply_async(operator.add, (2, 2)).get()
4
In termination2.patch I handle BaseExceptions, by exiting the worker process, and then letting the _worker_handler replace the process. It's very useful, because then people can kill -INT the worker process
Indeed! This could be done by adding listeners for this type of errors. pool.add_worker_missing_callback(fun) So MapResults could install a callback like this: def __init__():
...
_pool.add_worker_missing_callback(self._on_worker_missing)
...
def _on_worker_missing(self):
err = WorkerLostError(
"Worker lost while running map job")
self._set(None, (False, err))
What do you think about that? IMHO, even though the worker lost could be unrelated to the map job in |
Actually, the program you demonstrate is nonequivalent to the one I posted. The one I posted pickles just fine because 'bar' is a global name, but doesn't unpickle because it doesn't exist in the parent's namespace. (See http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled.) Although, if you're able to run my test program verbatim, then it's entirely possible I'm just missing something. Anyway, I do think that adding a 'worker_missing_callback' could work. You'd still have to make sure the ApplyResult (or MapResult) can crash the pool if it deems necessary though. |
Started looking at your patch. It seems to behave reasonably, although it still doesn't catch all of the failure cases. In particular, as you note, crashed jobs won't be noticed until the pool shuts down... but if you make a blocking call such as in the following program, you'll get a hang: The tests also occasionally hang in e.g. |
Yeah, and for that we could use the same approach as for the maps. But, I've just implemented the accept callback approach, which should be superior. Maps/Apply fails instantly as soon as a worker process crashes, but the pool remains fully functional. Patch multiprocessing-trunk@82502-termination-trackjobs.patch added. There seems to be some race conditions left, because some of the tests breaks from time to time. Maybe you can pinpoint it before me. |
Here is a proof-of-concept patch that makes concurrent.futures able to detect killed processes. Works only under POSIX, and needs bpo-11743. |
Actually, it came to me that if a child process exists, the queues are not guaranteed to be a consistent state anymore (the child could have terminated in the middle of a partial read or write). So it may be better to simply declare the ProcessPoolExecutor terminally broken when one of its children have exited. |
Under what circumstances do we expect a ProcessPoolExecutor child process to be killed outside of the control of the ProcessPoolExecutor? If the user kills a child then maybe all we want to do is raise an exception rather than deadlock as a convenience. |
Killed by the user, or by an automatic device (such as the Linux OOM
That's what the patch does, roughly. |
Crashed would be bad - it would indicate a bug in the
Right. But instead of trying to recover, it might be better to fail
It really dependents on whether we view the situation as expected (and Cheers, |
I meant a crash in Python itself, or any third-party extension module.
Yes, I think that's better (see my message about the internal state of |
Here is an updated patch. Much of it consists of changes in the Windows Connection implementation, where I had to use overlapped I/O in order to use WaitForMultipleObjects on named pipes. test_concurrent_futures sometimes blocks (under Windows), I'll try to debug. |
Ok, this patch seems fully debugged (under Windows and Linux). A couple of things come in addition, such as removing repeated polling in PipeConnection.poll() and _Popen.wait(). |
Hum, I get a strange skip on a XP buildbot: [224/354] test_multiprocessing Yet _multiprocessing was compiled fine... Does anyone know what it means? http://www.python.org/dev/buildbot/all/builders/x86%20XP-5%20custom/builds/5/steps/test/logs/stdio |
Ok, the culprit is CancelIoEx(), which is only supported under Vista and |
Ok, here's a patch for the new approach. (of course, all this is a lot of code just to have the desired WaitForMultipleObjects() semantics on a named pipe. Thank you, Windows) |
Part of the patch submitted standalone in bpo-12040. |
Antoine, I've got a couple questions concerning your patch:
Finally, I might be missing something completely obvious, but I have the feeling that POSIX already provides something that could help solve this issue: process groups. |
Not exactly. The select is done on the queue's pipe and on the workers'
No, but the implementation is not meant to be blazingly fast anyway
Not at that level. In concurrent.futures, a process exiting normally This approach could work for multiprocessing.Pool as well. However, the
waitpid() doesn't allow for a timeout, and it doesn't allow to check a |
You're right, I missed this part, it's perfectly safe. But I think there's a problem with the new implementation in Python. def _send_bytes(self, buf):
# For wire compatibility with 3.2 and lower
n = len(buf)
self._send(struct.pack("=i", len(buf)))
# The condition is necessary to avoid "broken pipe" errors
# when sending a 0-length buffer if the other end closed the pipe.
if n > 0:
self._send(buf) This is definitely not atomic. If two processes write objects of |
Indeed, it isn't, Pipe objects are not meant to be safe against multiple |
But if the write to the Pipe is not atomic, then the select isn't safe. worker process, inside Connection.send(): In the parent process, _poll() will return self._handle as readable as def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("=i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size) The first _recv() will succeed, since the length is in the Pipe. |
That's true. We would need to insert a select() before each raw read(). |
Ok, the dependencies are now committed. Here is a new patch addressing Charles-François' comments: select() is now called before each call to read() when sentinels are given, to avoid race conditions. |
The patch looks fine to me (but I didn't look at win32-specific code). |
New changeset 6d6099f7fe89 by Antoine Pitrou in branch 'default': |
So, concurrent.futures is fixed now. Unless someone wants to patch multiprocessing.Pool, I am closing this issue. |
test_multiprocessing crashs ~700 times on Mac OS X Tiger, regression likely introduced by this issue (6d6099f7fe89): I created issue bpo-12310 for that. |
I realize I'm 3 years late on this, but I've put together a patch for multiprocessing.Pool. Should a process in a Pool unexpectedly exit (meaning, *not* because of hitting the maxtasksperchild limit), the Pool will be closed/terminated and all cached/running tasks will return a BrokenProcessPool. These changes also prevent the Pool from going into a bad state if the "initializer" function raises an exception (previously, the pool would end up infinitely starting new processes, which would immediately die because of the exception). One concern with the patch: The way timings are altered with these changes, the Pool seems to be particularly susceptible to bpo-6721 in certain cases. If processes in the Pool are being restarted due to maxtasksperchild just as the worker is being closed or joined, there is a chance the worker will be forked while some of the debug logging inside of Pool is running (and holding locks on either sys.stdout or sys.stderr). When this happens, the worker deadlocks on startup, which will hang the whole program. I believe the current implementation is susceptible to this as well, but I could reproduce it much more consistently with this patch. I think its rare enough in practice that it shouldn't prevent the patch from being accepted, but thought I should point it out. (I do think bpo-6721 should be addressed, or at the very least internal I/O locks should always reset after forking.) |
Is it possible to have this issue re-opened, so that the new patch is more likely to get attention? Or should I create a new issue for the multiprocessing patch? |
You should certainly create a new issue! |
Thanks, Antoine. I've opened bpo-22393. |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: