Message110386
> Really? I could be misremembering, but I believe you deal
> with the case of the result being unpickleable. I.e. you
> deal with the put(result) failing, but not the get() in the
> result handler.
Your example is demonstrating the pickle error on put(), not on get().
> Does my sample program work with your patch applied?
Yeah, check this out:
/opt/devel/Python/trunk(master)$> patch -p1 < multiprocessing-trunk@82502-handle_worker_encoding_errors2.patch
patching file Lib/multiprocessing/pool.py
patching file Lib/test/test_multiprocessing.py
/opt/devel/Python/trunk(master)$> ./python.exe
Python 2.7 (unknown, Jul 13 2010, 13:28:35)
[GCC 4.2.1 (Apple Inc. build 5659)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import multiprocessing
>>> def foo():
... return lambda: 42
...
>>> p = multiprocessing.Pool(2)
>>> p.apply_async(foo).get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/Python/trunk/Lib/multiprocessing/pool.py", line 518, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<function <lambda> at 0x1005477d0>'. Reason: 'Can't pickle <type 'function'>: attribute lookup __builtin__.function failed'
>>> import operator
>>> p.apply_async(operator.add, (2, 2)).get()
4
> To be clear, in this case I was thinking of KeyboardInterrupts.
In termination2.patch I handle BaseExceptions, by exiting the worker process, and then letting the _worker_handler replace the process.
It's very useful, because then people can kill -INT the worker process
if they want to cancel the job, and without breaking other jobs running.
> From our differing use-cases, I do think it could make sense as
> a configuration option, but where it probably belongs is on the
> wait() call of ApplyResult.
Indeed! This could be done by adding listeners for this type of errors.
pool.add_worker_missing_callback(fun)
So MapResults could install a callback like this:
def __init__():
...
_pool.add_worker_missing_callback(self._on_worker_missing)
...
def _on_worker_missing(self):
err = WorkerLostError(
"Worker lost while running map job")
self._set(None, (False, err))
What do you think about that?
IMHO, even though the worker lost could be unrelated to the map job in
question, it would still be a better alternative than crashing the whole pool. |
|
Date |
User |
Action |
Args |
2010-07-15 19:04:31 | asksol | set | recipients:
+ asksol, jnoller, gdb |
2010-07-15 19:04:31 | asksol | set | messageid: <1279220671.55.0.0176296542071.issue9205@psf.upfronthosting.co.za> |
2010-07-15 19:04:29 | asksol | link | issue9205 messages |
2010-07-15 19:04:27 | asksol | create | |
|