Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parent process hanging in multiprocessing if children terminate unexpectedly #53451

Closed
gdb mannequin opened this issue Jul 8, 2010 · 78 comments
Closed

Parent process hanging in multiprocessing if children terminate unexpectedly #53451

gdb mannequin opened this issue Jul 8, 2010 · 78 comments
Labels
stdlib Python modules in the Lib dir type-bug An unexpected behavior, bug, or error

Comments

@gdb
Copy link
Mannequin

gdb mannequin commented Jul 8, 2010

BPO 9205
Nosy @jcea, @brianquinlan, @pitrou, @vstinner, @vlasovskikh
Dependencies
  • bpo-11743: Rewrite PipeConnection and Connection in pure Python
  • bpo-12040: Expose a Process.sentinel property (and fix polling loop in Process.join())
  • Files
  • termination.patch: Patch to deal with unexpected termination of children
  • shutdown.patch: Patch to catch exceptions raised upon interpeter shutdown
  • pickling_error.patch: Inform an application that is has failed if a task is unpickleable
  • multiprocessing-trunk@82502-termination3.patch
  • multiprocessing-trunk@82502-termination-trackjobs.patch
  • assign-tasks.patch: Record the assignment of tasks to workers
  • multiprocessing-trunk@82502-termination-trackjobs2.patch
  • multiprocessing-trunk@82502-termination-trackjobs3.patch
  • sentinels.patch
  • sentinels2.patch
  • sentinels3.patch
  • sentinels4.patch
  • sentinels5.patch
  • multiproc_broken_pool.diff: Raise BrokenProcessPool if a process unexecptedly dies in a multiprocessing.Pool
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = <Date 2011-06-08.15:25:30.134>
    created_at = <Date 2010-07-08.20:00:44.723>
    labels = ['type-bug', 'library']
    title = 'Parent process hanging in multiprocessing if children terminate unexpectedly'
    updated_at = <Date 2014-09-11.22:34:12.788>
    user = 'https://bugs.python.org/gdb'

    bugs.python.org fields:

    activity = <Date 2014-09-11.22:34:12.788>
    actor = 'dan.oreilly'
    assignee = 'jnoller'
    closed = True
    closed_date = <Date 2011-06-08.15:25:30.134>
    closer = 'pitrou'
    components = ['Library (Lib)']
    creation = <Date 2010-07-08.20:00:44.723>
    creator = 'gdb'
    dependencies = ['11743', '12040']
    files = ['17905', '17934', '17987', '18015', '18026', '18513', '18657', '18664', '21865', '21923', '21928', '21937', '22266', '36454']
    hgrepos = []
    issue_num = 9205
    keywords = ['patch']
    message_count = 78.0
    messages = ['109585', '109867', '109885', '109910', '109922', '109936', '110129', '110136', '110139', '110142', '110152', '110169', '110174', '110197', '110207', '110256', '110283', '110285', '110288', '110353', '110366', '110369', '110370', '110386', '110387', '110399', '110428', '110979', '111025', '111028', '111124', '111559', '111690', '111696', '111706', '111915', '113828', '114423', '114449', '115065', '115107', '115114', '115118', '115125', '115128', '132646', '132660', '132661', '132664', '132665', '132666', '135013', '135021', '135025', '135026', '135027', '135028', '135461', '135493', '135502', '135513', '135544', '135624', '135895', '135896', '135901', '135904', '135968', '136090', '137767', '137855', '137912', '137913', '138065', '225833', '226699', '226794', '226806']
    nosy_count = 16.0
    nosy_names = ['jcea', 'bquinlan', 'pitrou', 'vstinner', 'jnoller', 'hongqn', 'asksol', 'vlasovskikh', 'neologix', 'gdb', 'Albert.Strasheim', 'aljungberg', 'python-dev', 'sbt', 'gkcn', 'dan.oreilly']
    pr_nums = []
    priority = 'normal'
    resolution = 'fixed'
    stage = 'resolved'
    status = 'closed'
    superseder = None
    type = 'behavior'
    url = 'https://bugs.python.org/issue9205'
    versions = ['Python 3.3']

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 8, 2010

    I have recently begun using multiprocessing for a variety of batch
    jobs. It's a great library, and it's been quite useful. However, I have been bitten several times by situations where a worker process in a Pool will unexpectedly die, leaving multiprocessing hanging in a wait. A simple example of this is produced by the following:
    """
    #!/usr/bin/env python
    import multiprocessing, sys
    def foo(x):
    sys.exit(1)
    multiprocessing.Pool(1).apply(foo, [1])
    """
    The child will exit and the parent will hang forever. A similar occurrence happens if one pushes C-c while a child process is running (this special case is noted in http://bugs.python.org/issue8296) or killed by a signal.

    Attached is a patch to handle unexpected terminations of children
    processes and prevent the parent process from hanging. A test case is included. (Developed and tested on 64-bit Ubuntu.) Please let me know what you think. Thanks!

    @gdb gdb mannequin added stdlib Python modules in the Lib dir type-bug An unexpected behavior, bug, or error labels Jul 8, 2010
    @jnoller
    Copy link
    Mannequin

    jnoller mannequin commented Jul 10, 2010

    thanks greg; I'm going to take a look and think about this. I'd like to resolve bug 9207 first though

    @jnoller jnoller mannequin self-assigned this Jul 10, 2010
    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 10, 2010

    Cool, thanks.  I'll note that with this patch applied, using the test program from 9207 I consistently get the following exception:
    """
    Exception in thread Thread-1 (most likely raised during interpreter shutdown):
    Traceback (most recent call last):
      File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
      File "/usr/lib/python2.6/threading.py", line 484, in run
      File "/home/gdb/repositories/multiprocessing/pool.py", line 312, in _handle_workers
      File "/home/gdb/repositories/multiprocessing/pool.py", line 190, in _maintain_pool
      File "/home/gdb/repositories/multiprocessing/pool.py", line 158, in _join_exited_workers
    <type 'exceptions.TypeError'>: 'NoneType' object is not callable
    """

    This is line 148 in the unpatched source, namely the 'reversed(range(len(self._pool)))' line of _join_exited_workers. Looks like the same issue, where instead reversed/range/len have been set to None.

    So I think by changing how much time the worker_handler spends in various functions, I've made it possible (or just more likely?) that if we lose the race with interpreter shutdown the worker_handler will be in the middle of _join_exited_workers. This may mean that someone should keep around a local reference to reversed/range/len... not sure if there's a better solution.

    @jnoller
    Copy link
    Mannequin

    jnoller mannequin commented Jul 10, 2010

    Ugh. I'm going to have to think about the cleanest way of handling this case of functions vanishing from us since this is going to be more widespread inside the code. Suggestions welcome.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 10, 2010

    What about just catching the exception? See e.g. the attached patch. (Disclaimer: not heavily tested).

    @jnoller
    Copy link
    Mannequin

    jnoller mannequin commented Jul 10, 2010

    A+ for creativity; I wouldn't have thought of that ;)

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 12, 2010

    termination.patch, in the result handler you've added:

    while cache and thread._state != TERMINATE and not failed

    why are you terminating the second pass after finding a failed process?

    Unpickleable errors and other errors occurring in the worker body are not exceptional cases, at least not now that the pool is supervised by _handle_workers. I think the result should be set also in this case, so the user can inspect the exception after the fact.

    I have some other suggestions too, so I will review this patch tomorrow.

    For shutdown.patch, I thought this only happened in the worker handler, but you've enabled this for the result handler too? I don't care about the worker handler, but with the result handler I'm worried that I don't know what ignoring these exceptions actually means. For example, is there a possibility that we may lose results at shutdown?

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 12, 2010

    Thanks much for taking a look at this!

    why are you terminating the second pass after finding a failed
    process?
    Unfortunately, if you've lost a worker, you are no longer guaranteed that cache will eventually be empty. In particular, you may have lost a task, which could result in an ApplyResult waiting forever for a _set call.

    More generally, my chief assumption that went into this is that the unexpected death of a worker process is unrecoverable. It would be nice to have a better workaround than just aborting everything, but I couldn't see a way to do that.

    Unpickleable errors and other errors occurring in the worker body are
    not exceptional cases, at least not now that the pool is supervised
    by _handle_workers.
    I could be wrong, but that's not what my experiments were indicating. In particular, if an unpickleable error occurs, then a task has been lost, which means that the relevant map, apply, etc. will wait forever for completion of the lost task.

    I think the result should be set also in this case, so the user can
    inspect the exception after the fact.
    That does sound useful. Although, how can you determine the job (and the value of i) if it's an unpickleable error? It would be nice to be able to retrieve job/i without having to unpickle the rest.

    For shutdown.patch, I thought this only happened in the worker
    handler, but you've enabled this for the result handler too? I don't
    care about the worker handler, but with the result handler I'm
    worried that I don't know what ignoring these exceptions actually
    means.
    You have a good point. I didn't think about the patch very hard. I've only seen these exceptions from the worker handler, but AFAICT there's no guarantee that bad luck with the scheduler wouldn't result in the same problem in the result handler. One option would be to narrow the breadth of the exceptions caught by _make_shutdown_safe (do we need to catch anything but TypeErrors?). Another option would be to enable only for the worker handler. I don't have a particularly great sense of what the Right Thing to do here is.

    @jnoller
    Copy link
    Mannequin

    jnoller mannequin commented Jul 12, 2010

    Greg - I asked Ask to take a look - his celery package is a huge consumer of multiprocessing, and so I tend to run things past him as well.

    That said - to both of you - the fundamental problem the shutdown patch is trying to scratch is located in bpo-9207 - greg's termination patch just exposes the problem in 9207 a lot more.

    Focusing specifically on the shutdown patch; our issue is that during interpreter shutdown, sys.modules is iterated, and entries are set to None - for threads which "live on" well into that cycle can end up losing imported functions/modules/etc. The multiple daemon threads in the Pool code are exposing this as code which executed imported functions (such as the debug() statement in handle_workers) which will fire after the pool has exited and the interpreter is shut down.

    We can work around the shutdown issue (really, bug 9207) by ignoring the exception such as shutdown.patch does, or passing in references/adding references to the functions those methods need. Or (as Brett suggested) converting them to class methods and adding references to the class. Or passing them in via the signature like this _handle_workers(arg, _debug=debug), etc.

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 12, 2010

    Unfortunately, if you've lost a worker, you are no
    longer guaranteed that cache will eventually be empty.
    In particular, you may have lost a task, which could
    result in an ApplyResult waiting forever for a _set call.

    More generally, my chief assumption that went into this
    is that the unexpected death of a worker process is
    unrecoverable. It would be nice to have a better workaround
    than just aborting everything, but I couldn't see a way
    to do that.

    It would be a problem if the process simply disappeared,
    But in this case you have the ability to put a result on the queue,
    so it doesn't have to wait forever.

    For processes disappearing (if that can at all happen), we could solve
    that by storing the jobs a process has accepted (started working on),
    so if a worker process is lost, we can mark them as failed too.

    I could be wrong, but that's not what my experiments
    were indicating. In particular, if an unpickleable error occurs,
    then a task has been lost, which means that the relevant map,
    apply, etc. will wait forever for completion of the lost task.

    It's lost now, but not if we handle the error...
    For a single map operation this behavior may make sense, but what about
    someone running the pool as s long-running service for users to submit map operations to? Errors in this context are expected to happen, even unpickleable errors.

    I guess that the worker handler works as a supervisor is a side effect,
    as it was made for the maxtasksperchild feature, but for me it's a welcome one. With the supervisor in place, multiprocessing.pool is already fairly stable to be used for this use case, and there's not much to be done to make it solid (Celery is already running for months without issue, unless there's a pickling error...)

    That does sound useful. Although, how can you determine the
    job (and the value of i) if it's an unpickleable error?
    It would be nice to be able to retrieve job/i without having
    to unpickle the rest.

    I was already working on this issue last week actually, and I managed
    to do that in a way that works well enough (at least for me):
    http://github.com/ask/celery/commit/eaa4d5ddc06b000576a21264f11e6004b418bda1#diff-1

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 13, 2010

    For processes disappearing (if that can at all happen), we could solve
    that by storing the jobs a process has accepted (started working on),
    so if a worker process is lost, we can mark them as failed too.
    Sure, this would be reasonable behavior. I had considered it but decided it as a larger change than I wanted to make without consulting the devs.

    I was already working on this issue last week actually, and I managed
    to do that in a way that works well enough (at least for me):
    If I'm reading this right, you catch the exception upon pickling the result (at which point you have the job/i information already; totally reasonable). I'm worried about the case of unpickling the task failing. (Namely, the "task = get()" line of the "worker" method.) Try running the following:
    """
    #!/usr/bin/env python
    import multiprocessing
    p = multiprocessing.Pool(1)
    def foo(x):
    pass
    p.apply(foo, [1])
    """
    And if "task = get()" fails, then the worker doesn't know what the relevant job/i values are.

    Anyway, so I guess the question that is forming in my mind is, what sorts of errors do we want to handle, and how do we want to handle them? My answer is I'd like to handle all possible errors with some behavior that is not "hang forever". This includes handling children processes dying by signals or os._exit, raising unpickling errors, etc.

    I believe my patch provides this functionality. By adding the extra mechanism that you've written/proposed, we can improve the error handling in specific recoverable cases (which probably constitute the vast majority of real-world cases).

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 13, 2010

    I think I misunderstood the purpose of the patch. This is about handling errors on get(), not on put() like I was working on. So sorry for that confusion.

    What kind of errors are you having that makes the get() call fail?

    If the queue is not working, then I guess the only sensible approach is to shutdown the pool like suggested. I'll open up another issue for unpickleable errors then.

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 13, 2010

    For reference I opened up a new issue for the put() case here: http://bugs.python.org/issue9244

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 13, 2010

    What kind of errors are you having that makes the get() call fail?
    Try running the script I posted. It will fail with an AttributeError (raised during unpickling) and hang.

    I'll note that the particular issues that I've run into in practice are:

    • OOM kill destroying my workers but leaving the parent silently waiting
    • KeyboardInterrupting the workers, and then having the parent hang

    This AttributeError problem is one that I discovered while generating test cases for the patch.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 13, 2010

    While looking at your patch in bpo-9244, I realized that my code fails to handle an unpickleable task, as in:
    """
    #!/usr/bin/env python
    import multiprocessing
    foo = lambda x: x
    p = multiprocessing.Pool(1)
    p.apply(foo, [1])
    """
    This should be fixed by the attached pickling_error.patch (independent of my other patches).

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 14, 2010

    There's one more thing

         if exitcode is not None:
           cleaned = True
                    if exitcode != 0 and not worker._termination_requested:
                        abnormal.append((worker.pid, exitcode))

    Instead of restarting crashed worker processes it will simply bring down
    the pool, right?

    If so, then I think it's important to decide whether we want to keep
    the supervisor functionality, and if so decide on a recovery strategy.

    Some alternatives are:

    A) Any missing worker brings down the pool.

    B) Missing workers will be replaced one-by-one. A maximum-restart-frequency decides when the supervisor should give up trying to recover
    the pool, and crash it.

    C) Same as B, except that any process crashing when trying to get() will bring down the pool.

    I think the supervisor is a good addition, so I would very much like to keep it. It's also a step closer to my goal of adding the enhancements added by Celery to multiprocessing.pool.

    Using C is only a few changes away from this patch, but B would also be possible in combination with my accept_callback patch. It does pose some overhead, so it depends on the level of recovery we want to support.

    accept_callback: this is a callback that is triggered when the job is reserved by a worker process. The acks are sent to an additional Queue, with an additional thread processing the acks (hence the mentioned overhead). This enables us to keep track of what the worker processes are doing, also get the PID of the worker processing any given job (besides from recovery, potential uses are monitoring and the ability to terminate a job (ApplyResult.terminate?). See http://github.com/ask/celery/blob/master/celery/concurrency/processes/pool.py

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 14, 2010

    Jesse wrote,

    We can work around the shutdown issue (really, bug 9207) by
    ignoring the exception such as shutdown.patch does, or passing in
    references/adding references to the functions those methods need. Or (as Brett suggested) converting them to class methods and adding references to the class. Or passing them in via the signature like this _handle_workers(arg, _debug=debug), etc.

    Greg wrote,

    Another option would be to enable only for the worker handler. I
    don't have a particularly great sense of what the Right Thing to
    do here is.

    I don't think _make_shutdown_safe should be added to the result handler.
    If the error can indeed happen there, then we need to solve it in a way that enables it to finish the work.

    Jesse, how hard is it to fix the worker handler by passing the references? Note that _worker_handler is not important to complete shutdown at this point, but it may be in the future (it seems termination.patch already changes this)

    @jnoller
    Copy link
    Mannequin

    jnoller mannequin commented Jul 14, 2010

    Passing the references seems to be a losing game; for _handle_workers - we only need 1 function (debug) - for others (say _join_exited_workers), we need references to reversed/range/len.

    A possible alternative is to make those threads non-daemon threads; but I'd have to test that.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 14, 2010

    Before I forget, looks like we also need to deal with the result from a worker being un-unpickleable:
    """
    #!/usr/bin/env python
    import multiprocessing
    def foo(x):
    global bar
    def bar(x):
    pass
    return bar
    p = multiprocessing.Pool(1)
    p.apply(foo, [1])
    """

    This shouldn't require much more work, but I'll hold off on submitting a patch until we have a better idea of where we're going in this arena.

    Instead of restarting crashed worker processes it will simply bring down
    the pool, right?
    Yep. Again, as things stand, once you've lost an worker, you've lost a task, and you can't really do much about it. I guess that depends on your application though... is your use-case such that you can lose a task without it mattering? If tasks are idempotent, one could have the task handler resubmit them, etc.. But really, thinking about the failure modes I've seen (OOM kills/user-initiated interrupt) I'm not sure under what circumstances I'd like the pool to try to recover.

    The idea of recording the mapping of tasks -> workers seems interesting. Getting all of the corner cases could be hard (e.g. making removing a task from the queue and recording which worker did the removing atomic, detecting if the worker crashed while still holding the queue lock) and doing this would require extra mechanism. This feature does seem to be useful for pools running many different jobs, because that way a crashed worker need only terminate one job.

    Anyway, I'd be curious to know more about the kinds of crashes you've encountered from which you'd like to be able to recover. Is it just Unpickleable exceptions, or are there others?

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 15, 2010

    Greg,

    Before I forget, looks like we also need to deal with the
    result from a worker being un-unpickleable:

    This is what my patch in bug 9244 does...

    Yep. Again, as things stand, once you've lost an worker,
    you've lost a task, and you can't really do much about it.
    I guess that depends on your application though... is your
    use-case such that you can lose a task without it mattering?
    If tasks are idempotent, one could have the task handler
    resubmit them, etc.. But really, thinking about the failure
    modes I've seen (OOM kills/user-initiated interrupt) I'm not
    sure under what circumstances I'd like the pool to try to
    recover.

    Losing a task is not fun, but there may still be other tasks
    running that are just as important. I think you're thinking
    from a map_async perspective here.

    user-initiated interrupts, this is very important to recover from,
    think of some badly written library code suddenly raising SystemExit,
    this shouldn't terminate other jobs, and it's probably easy to recover from, so why shouldn't it try?

    The idea of recording the mapping of tasks -> workers
    seems interesting. Getting all of the corner cases could
    be hard (e.g. making removing a task from the queue and
    recording which worker did the removing atomic, detecting if the worker crashed while still holding the queue lock) and doing
    this would require extra mechanism. This feature does seem
    to be useful for pools running many different jobs, because
    that way a crashed worker need only terminate one job.

    I think I may have an alternative solution. Instead of keeping track of what the workers are doing, we could simply change the result handler
    so it gives up when there are no more alive processes.

        while state != TERMINATE:
            result = get(timeout=1)
            if all_processes_dead():
                break;

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 15, 2010

    Ok. I implemented my suggestions in the patch attached
    (multiprocessing-trunk@82502-termination2.patch)
    What do you think?

    Greg, Maybe we could keep the behavior in termination.patch as an option for map jobs? It is certainly a problem that map jobs won't terminate until the pool is joined.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 15, 2010

    > Before I forget, looks like we also need to deal with the
    > result from a worker being un-unpickleable:
    This is what my patch in bug 9244 does...
    Really? I could be misremembering, but I believe you deal with the case of the result being unpickleable. I.e. you deal with the put(result) failing, but not the get() in the result handler. Does my sample program work with your patch applied?

    while state != TERMINATE:
    result = get(timeout=1)
    if all_processes_dead():
    break;
    Will this sort of approach work with the supervisor, which continually respawns workers?

    user-initiated interrupts, this is very important to recover from,
    think of some badly written library code suddenly raising SystemExit,
    this shouldn't terminate other jobs, and it's probably easy to
    recover from, so why shouldn't it try?
    To be clear, in this case I was thinking of KeyboardInterrupts.

    I'll take a look at your patch in a bit. From our differing use-cases, I do think it could make sense as a configuration option, but where it probably belongs is on the wait() call of ApplyResult.

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 15, 2010

    Just some small cosmetic changes to the patch.
    (added multiprocessing-trunk@82502-termination3.patch)

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 15, 2010

    Really? I could be misremembering, but I believe you deal
    with the case of the result being unpickleable. I.e. you
    deal with the put(result) failing, but not the get() in the
    result handler.

    Your example is demonstrating the pickle error on put(), not on get().

    Does my sample program work with your patch applied?

    Yeah, check this out:

    /opt/devel/Python/trunk(master)$> patch -p1 < multiprocessing-trunk@82502-handle_worker_encoding_errors2.patch 
    patching file Lib/multiprocessing/pool.py
    patching file Lib/test/test_multiprocessing.py
    /opt/devel/Python/trunk(master)$> ./python.exe  
    Python 2.7 (unknown, Jul 13 2010, 13:28:35) 
    [GCC 4.2.1 (Apple Inc. build 5659)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import multiprocessing
    >>> def foo():
    ...     return lambda: 42
    ... 
    >>> p = multiprocessing.Pool(2)
    >>> p.apply_async(foo).get()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/opt/devel/Python/trunk/Lib/multiprocessing/pool.py", line 518, in get
        raise self._value
    multiprocessing.pool.MaybeEncodingError: Error sending result: '<function <lambda> at 0x1005477d0>'. Reason: 'Can't pickle <type 'function'>: attribute lookup __builtin__.function failed'
    >>> import operator
    >>> p.apply_async(operator.add, (2, 2)).get()
    4

    To be clear, in this case I was thinking of KeyboardInterrupts.

    In termination2.patch I handle BaseExceptions, by exiting the worker process, and then letting the _worker_handler replace the process.

    It's very useful, because then people can kill -INT the worker process
    if they want to cancel the job, and without breaking other jobs running.

    From our differing use-cases, I do think it could make sense as
    a configuration option, but where it probably belongs is on the
    wait() call of ApplyResult.

    Indeed! This could be done by adding listeners for this type of errors.

        pool.add_worker_missing_callback(fun)

    So MapResults could install a callback like this:

       def __init__():
            ...
            _pool.add_worker_missing_callback(self._on_worker_missing)
            ...
    
       def _on_worker_missing(self):
           err = WorkerLostError(
               "Worker lost while running map job")
           self._set(None, (False, err))
       
    What do you think about that?

    IMHO, even though the worker lost could be unrelated to the map job in
    question, it would still be a better alternative than crashing the whole pool.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 15, 2010

    Actually, the program you demonstrate is nonequivalent to the one I posted. The one I posted pickles just fine because 'bar' is a global name, but doesn't unpickle because it doesn't exist in the parent's namespace. (See http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled.) Although, if you're able to run my test program verbatim, then it's entirely possible I'm just missing something.

    Anyway, I do think that adding a 'worker_missing_callback' could work. You'd still have to make sure the ApplyResult (or MapResult) can crash the pool if it deems necessary though.

    @gdb
    Copy link
    Mannequin Author

    gdb mannequin commented Jul 15, 2010

    Started looking at your patch. It seems to behave reasonably, although it still doesn't catch all of the failure cases. In particular, as you note, crashed jobs won't be noticed until the pool shuts down... but if you make a blocking call such as in the following program, you'll get a hang:
    """
    #!/usr/bin/env python
    import multiprocessing, os, signal
    def foo(x):
    os.kill(os.getpid(), signal.SIGKILL)
    multiprocessing.Pool(1).apply(foo, [1])
    """

    The tests also occasionally hang in e.g.
    test_job_killed_by_signal (main.WithProcessesTestPoolSupervisor) ...

    @asksol
    Copy link
    Mannequin

    asksol mannequin commented Jul 16, 2010

    but if you make a blocking call such as in the following program,
    you'll get a hang

    Yeah, and for that we could use the same approach as for the maps.

    But, I've just implemented the accept callback approach, which should be superior. Maps/Apply fails instantly as soon as a worker process crashes, but the pool remains fully functional. Patch multiprocessing-trunk@82502-termination-trackjobs.patch added.

    There seems to be some race conditions left, because some of the tests breaks from time to time. Maybe you can pinpoint it before me.

    @pitrou
    Copy link
    Member

    pitrou commented May 3, 2011

    Here is a proof-of-concept patch that makes concurrent.futures able to detect killed processes. Works only under POSIX, and needs bpo-11743.
    I'm not sure it's a good idea to change the multiprocessing public API (SimpleQueue.get()) for this.

    @pitrou
    Copy link
    Member

    pitrou commented May 3, 2011

    Actually, it came to me that if a child process exists, the queues are not guaranteed to be a consistent state anymore (the child could have terminated in the middle of a partial read or write). So it may be better to simply declare the ProcessPoolExecutor terminally broken when one of its children have exited.

    @brianquinlan
    Copy link
    Contributor

    Under what circumstances do we expect a ProcessPoolExecutor child process to be killed outside of the control of the ProcessPoolExecutor?

    If the user kills a child then maybe all we want to do is raise an exception rather than deadlock as a convenience.

    @pitrou
    Copy link
    Member

    pitrou commented May 3, 2011

    Under what circumstances do we expect a ProcessPoolExecutor child
    process to be killed outside of the control of the
    ProcessPoolExecutor?

    Killed by the user, or by an automatic device (such as the Linux OOM
    killer), or crashed.

    If the user kills a child then maybe all we want to do is raise an
    exception rather than deadlock as a convenience.

    That's what the patch does, roughly.

    @brianquinlan
    Copy link
    Contributor

    Killed by the user, or by an automatic device (such as the Linux OOM
    killer), or crashed.

    Crashed would be bad - it would indicate a bug in the
    ProcessPoolExecutor code.

    > If the user kills a child then maybe all we want to do is raise an
    > exception rather than deadlock as a convenience.

    That's what the patch does, roughly.

    Right. But instead of trying to recover, it might be better to fail
    very loudly i.e.

    • fail every non-finished future
    • kill every child process in the ProcessPoolExecutor
    • set the ProcessPoolExecutor as shutdown so no new work can be
      scheduled
    • raise in wait(), as_completed(), etc.

    It really dependents on whether we view the situation as expected (and
    try our best to let the user recover) or as an aberrant situation that
    the user can't reasonably recover from.

    Cheers,
    Brian

    @pitrou
    Copy link
    Member

    pitrou commented May 3, 2011

    > Killed by the user, or by an automatic device (such as the Linux OOM
    > killer), or crashed.

    Crashed would be bad - it would indicate a bug in the
    ProcessPoolExecutor code.

    I meant a crash in Python itself, or any third-party extension module.

    >> If the user kills a child then maybe all we want to do is raise an
    >> exception rather than deadlock as a convenience.
    >
    > That's what the patch does, roughly.

    Right. But instead of trying to recover, it might be better to fail
    very loudly i.e.

    • fail every non-finished future
    • kill every child process in the ProcessPoolExecutor
    • set the ProcessPoolExecutor as shutdown so no new work can be
      scheduled

    Yes, I think that's better (see my message about the internal state of
    queues).

    @pitrou
    Copy link
    Member

    pitrou commented May 7, 2011

    Here is an updated patch. Much of it consists of changes in the Windows Connection implementation, where I had to use overlapped I/O in order to use WaitForMultipleObjects on named pipes.

    test_concurrent_futures sometimes blocks (under Windows), I'll try to debug.

    @pitrou
    Copy link
    Member

    pitrou commented May 7, 2011

    Ok, this patch seems fully debugged (under Windows and Linux). A couple of things come in addition, such as removing repeated polling in PipeConnection.poll() and _Popen.wait().

    @pitrou
    Copy link
    Member

    pitrou commented May 7, 2011

    Hum, I get a strange skip on a XP buildbot:

    [224/354] test_multiprocessing
    test_multiprocessing skipped -- DLL load failed: The specified procedure could not be found.

    Yet _multiprocessing was compiled fine... Does anyone know what it means?

    http://www.python.org/dev/buildbot/all/builders/x86%20XP-5%20custom/builds/5/steps/test/logs/stdio

    @pitrou
    Copy link
    Member

    pitrou commented May 8, 2011

    Hum, I get a strange skip on a XP buildbot:

    [224/354] test_multiprocessing
    test_multiprocessing skipped -- DLL load failed: The specified
    procedure could not be found.

    Yet _multiprocessing was compiled fine... Does anyone know what it
    means?

    Ok, the culprit is CancelIoEx(), which is only supported under Vista and
    later (but, strangely enough, compiles fine under XP).
    I need to use CancelIo() instead, which will lead me to change the
    implementation strategy slightly (the fact that CancelIo() is
    thread-specific makes it unsuitable for a tp_dealloc).

    @pitrou
    Copy link
    Member

    pitrou commented May 8, 2011

    Ok, here's a patch for the new approach.
    CancelIoEx is loaded dynamically and, if unavailable, CancelIo is used instead. I take care to cancel or complete the I/O in the same method call where it is initiated, meaning there's no thread-specificity issues.

    (of course, all this is a lot of code just to have the desired WaitForMultipleObjects() semantics on a named pipe. Thank you, Windows)

    @pitrou
    Copy link
    Member

    pitrou commented May 9, 2011

    Part of the patch submitted standalone in bpo-12040.

    @neologix
    Copy link
    Mannequin

    neologix mannequin commented May 13, 2011

    Antoine, I've got a couple questions concerning your patch:

    • IIUC, the principle is to create a pipe for each worker process, so that when the child exits the read-end - sentinel - becomes readable (EOF) from the parent, so you know that a child exited. Then, before reading from the the result queue, you perform a select on the list of sentinels to check that all workers are alive. Am I correct?
      If I am, then I have the following questions:
    • have you done some benchmarking to measure the performance impact of calling select at every get (I'm not saying it will necessary be noticeable, I'm just curious)?
    • isn't there a race if a process exits between the time select returns and the get?
    • is there a distinction between a normal exit and an abnormal one? The reason I'm asking is because with multiprocessing.Pool, you can have a maxtasksperchild argument which will make workers exit after having processed a given number of tasks, so I'm wondering how that would be handled with the current patch (on the other side, I think you patch only applies to concurrent.futures, not to raw Queues, right?).

    Finally, I might be missing something completely obvious, but I have the feeling that POSIX already provides something that could help solve this issue: process groups.
    We could create a new process group for a process pool, and checking whether children are still alive would be as simple as waitpid(-group, os.WNOHANG) (I don't know anything about Windows, but Google returned WaitForMultipleObjects which seems to work on multiple processes). You'd get the exit code for free.

    @pitrou
    Copy link
    Member

    pitrou commented May 13, 2011

    Antoine, I've got a couple questions concerning your patch:

    • IIUC, the principle is to create a pipe for each worker process, so
      that when the child exits the read-end - sentinel - becomes readable
      (EOF) from the parent, so you know that a child exited. Then, before
      reading from the the result queue, you perform a select on the list of
      sentinels to check that all workers are alive. Am I correct?

    Not exactly. The select is done on the queue's pipe and on the workers'
    fds *at the same time*. Thus there's no race condition.

    • have you done some benchmarking to measure the performance impact of
      calling select at every get (I'm not saying it will necessary be
      noticeable, I'm just curious)?

    No, but the implementation is not meant to be blazingly fast anyway
    (after all, it has just been rewritten in Python from C).

    • is there a distinction between a normal exit and an abnormal one?

    Not at that level. In concurrent.futures, a process exiting normally
    first sends its pid on the result queue. The parent then dequeues the
    pid and knows the process has ended cleanly.

    This approach could work for multiprocessing.Pool as well. However, the
    patch only caters with concurrent.futures indeed.

    Finally, I might be missing something completely obvious, but I have
    the feeling that POSIX already provides something that could help
    solve this issue: process groups.
    We could create a new process group for a process pool, and checking
    whether children are still alive would be as simple as waitpid(-group,
    os.WNOHANG)

    waitpid() doesn't allow for a timeout, and it doesn't allow to check a
    pipe concurrently, does it?

    @neologix
    Copy link
    Mannequin

    neologix mannequin commented May 13, 2011

    Not exactly. The select is done on the queue's pipe and on the workers'
    fds *at the same time*. Thus there's no race condition.

    You're right, I missed this part, it's perfectly safe.

    But I think there's a problem with the new implementation in Python.
    Writes to a pipe are guaranteed to be atomic if you write less than
    PIPE_BUF (4K on Linux, 512 by POSIX) at a time. Writes to a datagram
    Unix domain socket are also atomic.
    But Lib/multiprocessing/connection.py does:

        def _send_bytes(self, buf):
            # For wire compatibility with 3.2 and lower
            n = len(buf)
            self._send(struct.pack("=i", len(buf)))
            # The condition is necessary to avoid "broken pipe" errors
            # when sending a 0-length buffer if the other end closed the pipe.
            if n > 0:
                self._send(buf)

    This is definitely not atomic. If two processes write objects of
    different size at the same time, it can probably lead to trouble.
    Also, Pipe(duplex=True) should probably return a SOCK_DGRAM Unix
    socket for the same reason.
    If I missed something here, I promise to shut up ;-)

    @pitrou
    Copy link
    Member

    pitrou commented May 13, 2011

    But Lib/multiprocessing/connection.py does:

    def \_send_bytes(self, buf):
        # For wire compatibility with 3.2 and lower
        n = len(buf)
        self.\_send(struct.pack("=i", len(buf)))
        # The condition is necessary to avoid "broken pipe" errors
        # when sending a 0-length buffer if the other end closed the pipe.
        if n \> 0:
            self.\_send(buf)
    

    This is definitely not atomic.

    Indeed, it isn't, Pipe objects are not meant to be safe against multiple
    access. Queue objects (in multiprocessing/queues.py) use locks so they
    are safe.

    @neologix
    Copy link
    Mannequin

    neologix mannequin commented May 14, 2011

    Indeed, it isn't, Pipe objects are not meant to be safe against multiple
    access. Queue objects (in multiprocessing/queues.py) use locks so they
    are safe.

    But if the write to the Pipe is not atomic, then the select isn't safe.
    select will return as soon as some data is available for reading. So
    let's say this happens:
    parent process waiting inside Connection.recv() on poll():
    def recv(self, sentinels=None):
    if sentinels:
    self._poll(-1.0, sentinels)
    buf = self._recv_bytes()
    return pickle.loads(buf.getbuffer())

    worker process, inside Connection.send():
    def send_bytes(self, buf):
    n = len(buf)
    self._send(struct.pack("=i", len(buf)))
    [crash]
    # The condition is necessary to avoid "broken pipe" errors
    # when sending a 0-length buffer if the other end closed the pipe.
    if n > 0:
    self._send(buf)

    In the parent process, _poll() will return self._handle as readable as
    soon as len(buf) has been sent by the worker process. Thus,
    Connection.recv_bytes() will be called:

        def _recv_bytes(self, maxsize=None):
            buf = self._recv(4)
            size, = struct.unpack("=i", buf.getvalue())
            if maxsize is not None and size > maxsize:
                return None
            return self._recv(size)

    The first _recv() will succeed, since the length is in the Pipe.
    The second one, however, will remain stuck on the read from the
    pipe/unix socket, because there's no more data.
    This can be reproduced easily by adding a short sleep right after the
    sending of the length of the buffer inside send_bytes(), and then
    sending a SIGKILL to a worker process.
    (Also, I now remember why I made the comment about the handle being
    read after the select, I spotted this earlier but completely forgot
    about it afterwards...).

    @pitrou
    Copy link
    Member

    pitrou commented May 16, 2011

    Thus,
    Connection.recv_bytes() will be called:

    def \_recv_bytes(self, maxsize=None):
        buf = self.\_recv(4)
        size, = struct.unpack("=i", buf.getvalue())
        if maxsize is not None and size \> maxsize:
            return None
        return self.\_recv(size)
    

    The first _recv() will succeed, since the length is in the Pipe.
    The second one, however, will remain stuck on the read from the
    pipe/unix socket, because there's no more data.
    This can be reproduced easily by adding a short sleep right after the
    sending of the length of the buffer inside send_bytes(), and then
    sending a SIGKILL to a worker process.

    That's true. We would need to insert a select() before each raw read().

    @pitrou
    Copy link
    Member

    pitrou commented Jun 6, 2011

    Ok, the dependencies are now committed. Here is a new patch addressing Charles-François' comments: select() is now called before each call to read() when sentinels are given, to avoid race conditions.

    @neologix
    Copy link
    Mannequin

    neologix mannequin commented Jun 7, 2011

    Ok, the dependencies are now committed. Here is a new patch addressing Charles-François' comments: select() is now called before each call to read() when sentinels are given, to avoid race conditions.

    The patch looks fine to me (but I didn't look at win32-specific code).

    @python-dev
    Copy link
    Mannequin

    python-dev mannequin commented Jun 8, 2011

    New changeset 6d6099f7fe89 by Antoine Pitrou in branch 'default':
    Issue bpo-9205: concurrent.futures.ProcessPoolExecutor now detects killed
    http://hg.python.org/cpython/rev/6d6099f7fe89

    @pitrou
    Copy link
    Member

    pitrou commented Jun 8, 2011

    So, concurrent.futures is fixed now. Unless someone wants to patch multiprocessing.Pool, I am closing this issue.

    @pitrou pitrou closed this as completed Jun 8, 2011
    @vstinner
    Copy link
    Member

    test_multiprocessing crashs ~700 times on Mac OS X Tiger, regression likely introduced by this issue (6d6099f7fe89): I created issue bpo-12310 for that.

    @danoreilly
    Copy link
    Mannequin

    danoreilly mannequin commented Aug 24, 2014

    > So, concurrent.futures is fixed now. Unless someone wants to patch multiprocessing.Pool, I am closing this issue.

    I realize I'm 3 years late on this, but I've put together a patch for multiprocessing.Pool. Should a process in a Pool unexpectedly exit (meaning, *not* because of hitting the maxtasksperchild limit), the Pool will be closed/terminated and all cached/running tasks will return a BrokenProcessPool. These changes also prevent the Pool from going into a bad state if the "initializer" function raises an exception (previously, the pool would end up infinitely starting new processes, which would immediately die because of the exception).

    One concern with the patch: The way timings are altered with these changes, the Pool seems to be particularly susceptible to bpo-6721 in certain cases. If processes in the Pool are being restarted due to maxtasksperchild just as the worker is being closed or joined, there is a chance the worker will be forked while some of the debug logging inside of Pool is running (and holding locks on either sys.stdout or sys.stderr). When this happens, the worker deadlocks on startup, which will hang the whole program. I believe the current implementation is susceptible to this as well, but I could reproduce it much more consistently with this patch. I think its rare enough in practice that it shouldn't prevent the patch from being accepted, but thought I should point it out.

    (I do think bpo-6721 should be addressed, or at the very least internal I/O locks should always reset after forking.)

    @danoreilly
    Copy link
    Mannequin

    danoreilly mannequin commented Sep 10, 2014

    Is it possible to have this issue re-opened, so that the new patch is more likely to get attention? Or should I create a new issue for the multiprocessing patch?

    @pitrou
    Copy link
    Member

    pitrou commented Sep 11, 2014

    You should certainly create a new issue!

    @danoreilly
    Copy link
    Mannequin

    danoreilly mannequin commented Sep 11, 2014

    Thanks, Antoine. I've opened bpo-22393.

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    stdlib Python modules in the Lib dir type-bug An unexpected behavior, bug, or error
    Projects
    None yet
    Development

    No branches or pull requests

    4 participants