The multiprocessing module uses pickles to send data between processes.
If a blob fails to unpickle (bad implementation of __setstate__, invalid payload from __reduce__, random crash in __init__) when the multiprocessing module will crash inside the _handle_results worker, e.g.:
File "lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "lib\multiprocessing\pool.py", line 576, in _handle_results
task = get()
File "lib\multiprocessing\connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
TypeError: __init__() takes 1 positional argument but 4 were given
After this the worker has crashed and every task waiting from results from the pool will wait forever.
There are 2 things that I think should be fixed:
1. in handle_results, capture all unrecognized errors and propagate in the main thread. At this point at least one of the jobs' replies is lost forever so there is little point in trying to log and resume.
2. separate the result payload from the payload that contains the job index/id so they are unpickled in two steps. The first step unpickles the data internal to multiprocessing to know which task the result refers to. The second step unpickles the return value or exception from the function that was called, and if this object fails to unpickle, propagate that error to the main thread through the proper ApplyResult or IMapIterator instances.
|