classification
Title: Imap from ThreadPool behaves unexpectedly
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 3.7, Python 3.6, Python 3.5
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: davin Nosy List: davin, elias, fiete, lev-veshnyakov, xiang.zhang
Priority: normal Keywords: patch

Created on 2016-11-15 18:22 by lev-veshnyakov, last changed 2017-03-29 04:55 by xiang.zhang. This issue is now closed.

Files
File name Uploaded Description Edit
issue_28699_lacks_test_but_check_unsorted_py3x.patch davin, 2016-11-17 02:53 lacks proper test, preliminary patch review
issue28699_try.patch xiang.zhang, 2016-11-17 17:14 review
issue_28699_guards_iterable_still_lacks_formal_test_py3x.patch davin, 2016-11-25 22:03 lacks formal tests, preliminary patch replaces prior review
issue_28699_repro.py davin, 2016-11-25 22:05 crude form of tests
Pull Requests
URL Status Linked Edit
PR 693 merged xiang.zhang, 2017-03-17 11:21
PR 882 merged xiang.zhang, 2017-03-29 04:05
PR 884 merged xiang.zhang, 2017-03-29 04:19
Messages (17)
msg280872 - (view) Author: Lev Veshnyakov (lev-veshnyakov) Date: 2016-11-15 18:22
Consider the following code:

from multiprocessing.pool import ThreadPool

pool = ThreadPool(10)

def gen():
    yield 1 + '1' # here is an error

print(list(pool.imap(str, gen()))) # prints []
print(list(pool.map(str, gen()))) # raises TypeError

The difference is, that the line with imap prints an empty list, while the line with map raises an exception, as expected.

Change the above snippet of code, adding additional yield statement:

from multiprocessing.pool import ThreadPool

pool = ThreadPool(10)

def gen():
    yield 1
    yield 1 + '1' # here is an error

print(list(pool.imap(str, gen()))) # raises TypeError
print(list(pool.map(str, gen()))) # also would raise TypeError

So now both map and imap will raise the exception, as expected. Therefore I suppose the behavior of imap showed in the first case is wrong.
msg280958 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-16 14:54
To tie in the example given by @elias in issue28625, this inconsistency in behavior is not limited to ThreadPool -- it appears with a process Pool as well:


from multiprocessing import Pool


def double(x):
    return 2 * x

def get_numbers():
    raise Exception("oops")
    yield 1
    yield 2


>>> list(Pool(processes=2).imap(double, get_numbers()))  # returns list
[]
>>> list(Pool(processes=2).map(double, get_numbers()))
Traceback (most recent call last):
  ...
Exception: oops


def get_numbers_differently():
    yield 1
    raise Exception("oops")
    yield 2


>>> list(Pool(processes=2).imap(double, get_numbers_differently()))  # now we see exception
Traceback (most recent call last):
  ...
Exception: oops
msg280959 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-16 14:56
This inconsistent behavior in imap on both Pool and ThreadPool is not what I would expect.
msg281020 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-17 02:53
Though it still lacks a proper test, I'm attaching a preliminary patch to address the problematic behavior in 3.5/3.6/default in the hopes that others might help test it.
msg281029 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2016-11-17 08:51
Hi Davin, could it be fixed like this?

diff -r 05a728e1da15 Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py	Wed Nov 16 16:35:53 2016 -0800
+++ b/Lib/multiprocessing/pool.py	Thu Nov 17 16:35:38 2016 +0800
@@ -398,7 +398,7 @@
             except Exception as ex:
                 job, ind = task[:2] if task else (0, 0)
                 if job in cache:
-                    cache[job]._set(ind + 1, (False, ex))
+                    cache[job]._set(ind + 1 if task else 0, (False, ex))
                 if set_length:
                     util.debug('doing set_length()')
                     set_length(i+1)

It seems to me the bug is _handle_tasks doesn't treat the exception correctly if it's on the very first. Every time it _set(ind + 1) since if there is any exception the task is the previous task and + 1 is needed. But if the exception occurs at the very first, task is None and the + 1 is not needed.

I am not very sure but the reported cases work correctly now:

list(Pool(processes=2).imap(double, get_numbers()))  # raises error now
list(pool.imap(str, gen()))  # raises error now
msg281040 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2016-11-17 13:43
What's more, this case seems non-reentrant. Since there is no task in this case, the job id is always 0 which is not true. This means after the first time, we can not set even the exception.
msg281056 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2016-11-17 17:14
Here is a patch which is just a try. I don't quite like the implementation but I can not figure out a better solution. The examples in this one and #28696 seems to work and no test fails currently.
msg281059 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-17 17:41
@xiang.zhang: Your patch looks to be introducing a number of changes to the structure of the data being passed around between threads and even monitored/indirectly shared across processes.  It's looking increasingly high risk to me.

We already have logic for handling exceptions arising during jobs but the one situation overlooked in this logic is if the exception occurs "quickly in an unfortunate order", meaning the exception is encountered and reported before any of the other individual tasks can complete and respond with a result.  This oversight of logic can be addressed a couple of ways:
1.  Add a flag to our IMapIterator to indicate when any exception is encountered.
2.  Modify the tuples / data structures being maintained across IMapIterator's _cache, _items, _unsorted, _index, and _length.
3.  Under relevant conditions, check both _items and _unsorted (not only _items) before declaring that we truly have all results in.

I regard option 1 as being potentially a bit fragile and option 2 as introducing non-trivial complexity and risk.  With option 3, there's effectively no risk and no measurable cost getting to the truth of what has actually happened.
msg281060 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2016-11-17 18:01
> Your patch looks to be introducing a number of changes to the structure of the data being passed around between threads and even monitored/indirectly shared across processes. 

That's why I say even myself don't like it. To solve an edge case some long introduced codes have to been changed. Sigh.

> Under relevant conditions, check both _items and _unsorted (not only _items) before declaring that we truly have all results in.

I think you mean your patch. But how does it solve the reentrant problem? Yeah, actually it's not reentrant. If the problematic job is not the first job with id 0, then the exception won't be set.

With your patch, repeatedly execute print(list(pool.imap(str, gen()))). Only the first time there is an exception.
msg281741 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-25 22:03
@xiang.zhang:  Nice catch -- thank you for pointing out the additional issue that arises when trying to provoke the issue twice in a row.

The module attribute `job_counter` helped, in this situation, point out what might have otherwise been overlooked.

I didn't like any of my previously suggested approaches, but I think there's a 4th option:
4.  Guard against misbehaving generators/iterables *before* they are put into the taskqueue.


Now when we provide a problematic generator/iterable to imap, the exception it triggers is caught and the resulting exception passed through the system to make use of the logic that is already in place.

This same issue can arise for imap_unordered() as well as imap() and can be addressed in the same manner.


Attaching another preliminary patch that still lacks formal tests but I'll attach crude versions of tests momentarily.

If we're still missing some use case or other logic path, now's the time to find it.
msg281742 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2016-11-25 22:05
Attaching promised crude tests.
msg281847 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2016-11-28 08:22
> 4.  Guard against misbehaving generators/iterables *before* they are put into the taskqueue.

This approach is good. 2 points about the patch:

1. How about _map_async(map)? Does it need the same strategy? For an iterator with __len__ defined it seems to get the same issue as here.

from multiprocessing import Pool
def double(x):
    return 2 * x
class buggy:
        def __iter__(self):
                return self
        def __next__(self):
                raise Exception('oops')
        def __len__(self):
                return 1
list(Pool(processes=2).map(double, buggy()))
list(Pool(processes=2).map(double, buggy()))  # hangs

2. The logic in _handle_tasks to handle task, i to could be removed I think. With _guarded_task_generation the for loop cannot fail and the logic itself is buggy now.
msg282171 - (view) Author: (fiete) Date: 2016-12-01 13:00
Since the only thing I know about the multiprocessing internals is what I just read in the source code trying to debug my imap_unordered call, I'll add the following example, not knowing whether this is already covered by everything you have until now.


import multiprocessing.pool

def gen():
    raise Exception('generator exception')
    yield 1
    yield 2

for i in range(3):
    with multiprocessing.pool.ThreadPool(3) as pool:
        try:
            print(list(pool.imap_unordered(lambda x: x*2, gen())))
        except Exception as e:
            print(e)


This only prints 'generator exception' once for the first iteration. For the following iterations imap_unordered returns an empty list. This is the case for both Pool and ThreadPool.
msg289754 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2017-03-17 11:23
Davin, I propose a PR to solve this problem based on your patch. Hope you are willing to review and let's finish this. ;-)
msg290764 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2017-03-29 03:58
New changeset 794623bdb232eafd8925f76470209afcdcbcdcd2 by Xiang Zhang in branch 'master':
bpo-28699: fix  abnormal behaviour of pools in multiprocessing.pool (GH-693)
https://github.com/python/cpython/commit/794623bdb232eafd8925f76470209afcdcbcdcd2
msg290765 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2017-03-29 04:50
New changeset 346dcd65e6b832a35b4cfc15b7309b51a38e9ca2 by Xiang Zhang in branch '3.6':
bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (GH-882)
https://github.com/python/cpython/commit/346dcd65e6b832a35b4cfc15b7309b51a38e9ca2
msg290766 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2017-03-29 04:51
New changeset 9f8e0904580cae3b99f8d343569b76e1be7e6092 by Xiang Zhang in branch '3.5':
bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (GH-884)
https://github.com/python/cpython/commit/9f8e0904580cae3b99f8d343569b76e1be7e6092
History
Date User Action Args
2017-03-29 04:55:02xiang.zhangsetstatus: open -> closed
resolution: fixed
stage: patch review -> resolved
2017-03-29 04:51:31xiang.zhangsetmessages: + msg290766
2017-03-29 04:50:30xiang.zhangsetmessages: + msg290765
2017-03-29 04:19:40xiang.zhangsetpull_requests: + pull_request786
2017-03-29 04:05:16xiang.zhangsetpull_requests: + pull_request784
2017-03-29 03:58:57xiang.zhangsetmessages: + msg290764
2017-03-17 11:23:08xiang.zhangsetmessages: + msg289754
stage: needs patch -> patch review
2017-03-17 11:21:16xiang.zhangsetpull_requests: + pull_request566
2016-12-01 13:00:10fietesetnosy: + fiete
messages: + msg282171
2016-11-28 08:22:21xiang.zhangsetmessages: + msg281847
2016-11-25 22:05:27davinsetfiles: + issue_28699_repro.py

messages: + msg281742
2016-11-25 22:03:33davinsetfiles: + issue_28699_guards_iterable_still_lacks_formal_test_py3x.patch

messages: + msg281741
2016-11-17 18:01:51xiang.zhangsetmessages: + msg281060
2016-11-17 17:41:33davinsetmessages: + msg281059
2016-11-17 17:14:41xiang.zhangsetfiles: + issue28699_try.patch

messages: + msg281056
2016-11-17 13:43:29xiang.zhangsetmessages: + msg281040
2016-11-17 08:51:09xiang.zhangsetnosy: + xiang.zhang
messages: + msg281029
2016-11-17 02:53:28davinsetfiles: + issue_28699_lacks_test_but_check_unsorted_py3x.patch
keywords: + patch
messages: + msg281020
2016-11-16 15:01:52davinlinkissue28696 superseder
2016-11-16 14:56:11davinsetmessages: + msg280959
versions: + Python 3.6, Python 3.7
2016-11-16 14:54:30davinsetnosy: + elias
messages: + msg280958

assignee: davin
stage: needs patch
2016-11-16 14:48:38davinlinkissue28625 superseder
2016-11-15 18:29:44lev-veshnyakovsetnosy: + davin
2016-11-15 18:22:19lev-veshnyakovcreate