msg280872 - (view) |
Author: Lev Veshnyakov (lev-veshnyakov) |
Date: 2016-11-15 18:22 |
Consider the following code:
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def gen():
yield 1 + '1' # here is an error
print(list(pool.imap(str, gen()))) # prints []
print(list(pool.map(str, gen()))) # raises TypeError
The difference is, that the line with imap prints an empty list, while the line with map raises an exception, as expected.
Change the above snippet of code, adding additional yield statement:
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def gen():
yield 1
yield 1 + '1' # here is an error
print(list(pool.imap(str, gen()))) # raises TypeError
print(list(pool.map(str, gen()))) # also would raise TypeError
So now both map and imap will raise the exception, as expected. Therefore I suppose the behavior of imap showed in the first case is wrong.
|
msg280958 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-16 14:54 |
To tie in the example given by @elias in issue28625, this inconsistency in behavior is not limited to ThreadPool -- it appears with a process Pool as well:
from multiprocessing import Pool
def double(x):
return 2 * x
def get_numbers():
raise Exception("oops")
yield 1
yield 2
>>> list(Pool(processes=2).imap(double, get_numbers())) # returns list
[]
>>> list(Pool(processes=2).map(double, get_numbers()))
Traceback (most recent call last):
...
Exception: oops
def get_numbers_differently():
yield 1
raise Exception("oops")
yield 2
>>> list(Pool(processes=2).imap(double, get_numbers_differently())) # now we see exception
Traceback (most recent call last):
...
Exception: oops
|
msg280959 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-16 14:56 |
This inconsistent behavior in imap on both Pool and ThreadPool is not what I would expect.
|
msg281020 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-17 02:53 |
Though it still lacks a proper test, I'm attaching a preliminary patch to address the problematic behavior in 3.5/3.6/default in the hopes that others might help test it.
|
msg281029 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2016-11-17 08:51 |
Hi Davin, could it be fixed like this?
diff -r 05a728e1da15 Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py Wed Nov 16 16:35:53 2016 -0800
+++ b/Lib/multiprocessing/pool.py Thu Nov 17 16:35:38 2016 +0800
@@ -398,7 +398,7 @@
except Exception as ex:
job, ind = task[:2] if task else (0, 0)
if job in cache:
- cache[job]._set(ind + 1, (False, ex))
+ cache[job]._set(ind + 1 if task else 0, (False, ex))
if set_length:
util.debug('doing set_length()')
set_length(i+1)
It seems to me the bug is _handle_tasks doesn't treat the exception correctly if it's on the very first. Every time it _set(ind + 1) since if there is any exception the task is the previous task and + 1 is needed. But if the exception occurs at the very first, task is None and the + 1 is not needed.
I am not very sure but the reported cases work correctly now:
list(Pool(processes=2).imap(double, get_numbers())) # raises error now
list(pool.imap(str, gen())) # raises error now
|
msg281040 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2016-11-17 13:43 |
What's more, this case seems non-reentrant. Since there is no task in this case, the job id is always 0 which is not true. This means after the first time, we can not set even the exception.
|
msg281056 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2016-11-17 17:14 |
Here is a patch which is just a try. I don't quite like the implementation but I can not figure out a better solution. The examples in this one and #28696 seems to work and no test fails currently.
|
msg281059 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-17 17:41 |
@xiang.zhang: Your patch looks to be introducing a number of changes to the structure of the data being passed around between threads and even monitored/indirectly shared across processes. It's looking increasingly high risk to me.
We already have logic for handling exceptions arising during jobs but the one situation overlooked in this logic is if the exception occurs "quickly in an unfortunate order", meaning the exception is encountered and reported before any of the other individual tasks can complete and respond with a result. This oversight of logic can be addressed a couple of ways:
1. Add a flag to our IMapIterator to indicate when any exception is encountered.
2. Modify the tuples / data structures being maintained across IMapIterator's _cache, _items, _unsorted, _index, and _length.
3. Under relevant conditions, check both _items and _unsorted (not only _items) before declaring that we truly have all results in.
I regard option 1 as being potentially a bit fragile and option 2 as introducing non-trivial complexity and risk. With option 3, there's effectively no risk and no measurable cost getting to the truth of what has actually happened.
|
msg281060 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2016-11-17 18:01 |
> Your patch looks to be introducing a number of changes to the structure of the data being passed around between threads and even monitored/indirectly shared across processes.
That's why I say even myself don't like it. To solve an edge case some long introduced codes have to been changed. Sigh.
> Under relevant conditions, check both _items and _unsorted (not only _items) before declaring that we truly have all results in.
I think you mean your patch. But how does it solve the reentrant problem? Yeah, actually it's not reentrant. If the problematic job is not the first job with id 0, then the exception won't be set.
With your patch, repeatedly execute print(list(pool.imap(str, gen()))). Only the first time there is an exception.
|
msg281741 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-25 22:03 |
@xiang.zhang: Nice catch -- thank you for pointing out the additional issue that arises when trying to provoke the issue twice in a row.
The module attribute `job_counter` helped, in this situation, point out what might have otherwise been overlooked.
I didn't like any of my previously suggested approaches, but I think there's a 4th option:
4. Guard against misbehaving generators/iterables *before* they are put into the taskqueue.
Now when we provide a problematic generator/iterable to imap, the exception it triggers is caught and the resulting exception passed through the system to make use of the logic that is already in place.
This same issue can arise for imap_unordered() as well as imap() and can be addressed in the same manner.
Attaching another preliminary patch that still lacks formal tests but I'll attach crude versions of tests momentarily.
If we're still missing some use case or other logic path, now's the time to find it.
|
msg281742 - (view) |
Author: Davin Potts (davin) *  |
Date: 2016-11-25 22:05 |
Attaching promised crude tests.
|
msg281847 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2016-11-28 08:22 |
> 4. Guard against misbehaving generators/iterables *before* they are put into the taskqueue.
This approach is good. 2 points about the patch:
1. How about _map_async(map)? Does it need the same strategy? For an iterator with __len__ defined it seems to get the same issue as here.
from multiprocessing import Pool
def double(x):
return 2 * x
class buggy:
def __iter__(self):
return self
def __next__(self):
raise Exception('oops')
def __len__(self):
return 1
list(Pool(processes=2).map(double, buggy()))
list(Pool(processes=2).map(double, buggy())) # hangs
2. The logic in _handle_tasks to handle task, i to could be removed I think. With _guarded_task_generation the for loop cannot fail and the logic itself is buggy now.
|
msg282171 - (view) |
Author: (fiete) |
Date: 2016-12-01 13:00 |
Since the only thing I know about the multiprocessing internals is what I just read in the source code trying to debug my imap_unordered call, I'll add the following example, not knowing whether this is already covered by everything you have until now.
import multiprocessing.pool
def gen():
raise Exception('generator exception')
yield 1
yield 2
for i in range(3):
with multiprocessing.pool.ThreadPool(3) as pool:
try:
print(list(pool.imap_unordered(lambda x: x*2, gen())))
except Exception as e:
print(e)
This only prints 'generator exception' once for the first iteration. For the following iterations imap_unordered returns an empty list. This is the case for both Pool and ThreadPool.
|
msg289754 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2017-03-17 11:23 |
Davin, I propose a PR to solve this problem based on your patch. Hope you are willing to review and let's finish this. ;-)
|
msg290764 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2017-03-29 03:58 |
New changeset 794623bdb232eafd8925f76470209afcdcbcdcd2 by Xiang Zhang in branch 'master':
bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (GH-693)
https://github.com/python/cpython/commit/794623bdb232eafd8925f76470209afcdcbcdcd2
|
msg290765 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2017-03-29 04:50 |
New changeset 346dcd65e6b832a35b4cfc15b7309b51a38e9ca2 by Xiang Zhang in branch '3.6':
bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (GH-882)
https://github.com/python/cpython/commit/346dcd65e6b832a35b4cfc15b7309b51a38e9ca2
|
msg290766 - (view) |
Author: Xiang Zhang (xiang.zhang) *  |
Date: 2017-03-29 04:51 |
New changeset 9f8e0904580cae3b99f8d343569b76e1be7e6092 by Xiang Zhang in branch '3.5':
bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (GH-884)
https://github.com/python/cpython/commit/9f8e0904580cae3b99f8d343569b76e1be7e6092
|
|
Date |
User |
Action |
Args |
2022-04-11 14:58:39 | admin | set | github: 72885 |
2017-03-29 04:55:02 | xiang.zhang | set | status: open -> closed resolution: fixed stage: patch review -> resolved |
2017-03-29 04:51:31 | xiang.zhang | set | messages:
+ msg290766 |
2017-03-29 04:50:30 | xiang.zhang | set | messages:
+ msg290765 |
2017-03-29 04:19:40 | xiang.zhang | set | pull_requests:
+ pull_request786 |
2017-03-29 04:05:16 | xiang.zhang | set | pull_requests:
+ pull_request784 |
2017-03-29 03:58:57 | xiang.zhang | set | messages:
+ msg290764 |
2017-03-17 11:23:08 | xiang.zhang | set | messages:
+ msg289754 stage: needs patch -> patch review |
2017-03-17 11:21:16 | xiang.zhang | set | pull_requests:
+ pull_request566 |
2016-12-01 13:00:10 | fiete | set | nosy:
+ fiete messages:
+ msg282171
|
2016-11-28 08:22:21 | xiang.zhang | set | messages:
+ msg281847 |
2016-11-25 22:05:27 | davin | set | files:
+ issue_28699_repro.py
messages:
+ msg281742 |
2016-11-25 22:03:33 | davin | set | files:
+ issue_28699_guards_iterable_still_lacks_formal_test_py3x.patch
messages:
+ msg281741 |
2016-11-17 18:01:51 | xiang.zhang | set | messages:
+ msg281060 |
2016-11-17 17:41:33 | davin | set | messages:
+ msg281059 |
2016-11-17 17:14:41 | xiang.zhang | set | files:
+ issue28699_try.patch
messages:
+ msg281056 |
2016-11-17 13:43:29 | xiang.zhang | set | messages:
+ msg281040 |
2016-11-17 08:51:09 | xiang.zhang | set | nosy:
+ xiang.zhang messages:
+ msg281029
|
2016-11-17 02:53:28 | davin | set | files:
+ issue_28699_lacks_test_but_check_unsorted_py3x.patch keywords:
+ patch messages:
+ msg281020
|
2016-11-16 15:01:52 | davin | link | issue28696 superseder |
2016-11-16 14:56:11 | davin | set | messages:
+ msg280959 versions:
+ Python 3.6, Python 3.7 |
2016-11-16 14:54:30 | davin | set | nosy:
+ elias messages:
+ msg280958
assignee: davin stage: needs patch |
2016-11-16 14:48:38 | davin | link | issue28625 superseder |
2016-11-15 18:29:44 | lev-veshnyakov | set | nosy:
+ davin
|
2016-11-15 18:22:19 | lev-veshnyakov | create | |