New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
itertools.tee not thread-safe; can segfault interpreter when wrapped iterator releases GIL #78591
Comments
Hello, When I run the attached code, I encounter a segmentation fault. Thanks, |
I figured out that the problem is itertools.tee does not use a multiprocessing.Manager proxied object for shared state. I was able to create a workaround tee as follows. def multiprocessing_tee(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lists = [m.list() for i in range(n)]
def gen(local_list):
keep_m_alive = m
while True:
if not local_list: # when the local list is empty
newval = next(it) # fetch a new value and
for l in lists: # load it to all the lists
l.append(newval)
yield local_list.pop(-1)
return tuple(gen(l) for l in lists) |
Okay I needed to do .pop(0) instead of .pop(-1) which is probably O(N) |
You'll also need to lock when modifying the manager's list. Does anyone know how to do this using the multiprocessing.Queues without deadlocking? |
Thanks for the script. I can reproduce this on master and Python 3.6 too. Sometimes the attached script causes timeout error. Running it under gdb gives me below : [New Thread 0x18ab of process 10682] Thread 2 received signal SIGSEGV, Segmentation fault. Backtrace : #0 0x000000010033e509 in teedataobject_getitem (tdo=0x100645640, i=1) le=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x1...(truncated), throwflag=0) at Python/ceval.c:3267
#14 0x0000000100215277 in PyEval_EvalFrameEx (
f=Frame 0x10308d050, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 865, in run (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x1...(truncated), throwflag=0) at Python/ceval.c:536
#15 0x0000000100045d63 in function_code_fastcall (co=0x101c7aa00, args=0x10190a8f8, nargs=1,
globals={'__name__': 'threading', '__doc__': "Thread module emulating a subset of Java's threading model.", '__package__': '', '__loader__': <SourceFileLoader(name='threading', path='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py') at remote 0x101c6a4d0>, '__spec__': <ModuleSpec(name='threading', loader=<...>, origin='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', loader_state=None, submodule_search_locations=None, _set_fileattr=True, _cached='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', _initializing=False) at remote 0x101c6a530>, '__file__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', '__cached__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', '__builtins__': {'__name__': 'builtins', '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis represents `...' in slices.", '__package__': '', '...(truncated)) at Objects/call.c:283
#16 0x0000000100045069 in _PyFunction_FastCallKeywords (func=<function at remote 0x101d9e890>,
stack=0x10190a8f0, nargs=1, kwnames=0x0) at Objects/call.c:408
#17 0x0000000100233e50 in call_function (pp_stack=0x103b522b8, oparg=1, kwnames=0x0)
at Python/ceval.c:4623
#18 0x000000010022bcf9 in _PyEval_EvalFrameDefault (
f=Frame 0x10190a750, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 917, in _bootstrap_inner (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object ...(truncated), throwflag=0) at Python/ceval.c:3186
#19 0x0000000100215277 in PyEval_EvalFrameEx (
f=Frame 0x10190a750, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.p---Type <return> to continue, or q <return> to quit y, line 917, in _bootstrap_inner (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object ...(truncated), throwflag=0) at Python/ceval.c:536 Thanks |
Davin, is there anything itertools.tee() can do about this or is this a multiprocessing issue? |
I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue. The first one I wrote uses a managed list. def multiprocessing_tee(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
mylock = m.Lock()
lists = [m.list() for i in range(n)]
def gen(local_list):
for i in itertools.count():
with mylock:
if not local_list: # when the local list is empty
newval = next(it) # fetch a new value and
for l in lists: # load it to all the lists
l.append(newval)
yield local_list.pop(0)
return tuple(gen(l) for l in lists) The second two implementations use queues. def multiprocessing_tee_q(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
def gen(myqueue):
while True:
with lock: # no one else touches anything
try:
newval = myqueue.get_nowait()
except Queue.Empty:
newval = next(it)
for q in queues:
q.put(newval)
newval = myqueue.get()
yield newval
return tuple(gen(q) for q in queues)
class Sentinel(object):
"""used as Queue Sentinel"""
def multiprocessing_tee_q2(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
def gen(myqueue):
while True:
try:
retval = myqueue.get_nowait()
except Queue.Empty:
# what happens if the other process puts last item in my queue before i get lock?
with lock: # no one else touches anything
try:
newval = next(it)
except StopIteration:
newval = Sentinel
for q in queues:
q.put(newval)
retval = myqueue.get()
if retval is Sentinel:
raise StopIteration
yield retval
return tuple(gen(q) for q in queues) I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations. |
|
It seems to me the problem is tee objects might encounter race conditions while As for multiprocessing.pool, there is a background task handling thread consuming one tee object and main thread consuming another one. The underlying iterator is While trying, I find the following snippet would also crash: import threading
import itertools
class C:
def __iter__(self):
return self
def __next__(self):
return 1
def test(i):
print(list(i)) i1, i2 = itertools.tee(C()) GDB shows it crashs in |
Ahh, the infinite iterator could also releases GIL in |
If what you've said is correct, would it make the most sense to create a Manager method which returns a Proxy to a tee'd iterator? |
Carlo: The point of Xiang's post is that this is only tangentially related to multiprocessing; the real problem is that tee-ing an iterator implemented in Python (of which pool.imap_unordered is just one example) and using the resulting tee-ed iterators in multiple threads (which pool.imap_unordered does implicitly, as there is a thread involved in dispatching work). The problem is *exposed* by multiprocessing.pool.imap_unordered, but it entirely a problem with itertools.tee, and as Xiang's repro indicates, it can be triggered easily without the complexity of multiprocessing being involved. I've updated the bug title to reflect this. |
Xiang Zhang, would you like to submit a patch? |
I could. But currently I don't have a good idea how to fix it elegantly. If anyone else makes a PR, I'm willing to review it. |
This might be redundant but I googled 'itertools tee thread safe' and came across a detailed SO answer from 2017 that explains the issue along with a similar example that causes segfault in the compiler but it was not reported here it seems. It also proposes a possible solution to fix the iterator but not tee itself. SO answer : https://stackoverflow.com/a/44638570/2610955 Similar open issue that could be a duplicate https://bugs.python.org/issue24482 Thanks |
Multi-process need uses multiprocessing.Manager to share, the current problem should be tee-objcet thread safety issue.As Xiang Zhang said, |
I take this. |
I tried to solve this issue myself, and figured out that it is not so simple. It is possible to make tee() nor crashing, but it is not possible to guarantee the order of the output without using a lock in tee(). It you can get a sequence 1, 2, 4, 3, ... So we have the following options:
We can apply different solutions in developed and maintained versions. I would not backport options with locks. |
Thanks for enumerating the options. I think 1 and 2 are the best combination. It is a reasonable restriction to not tee across threads. If someone still does, then detecting it, raising an exception, and not crashing seem like a reasonable response. |
I've applied the patch to "master". I considered backporting but am thinking that would be risky at this stage in the 3.8 release. |
The documentation changes should be backported. And I think we need a change like PR 9254, but with raising a RuntimeError instead of adding the value to the queue, to be applied in older versions. It may be better to apply it even to the developed version. There is nothing wrong with creating the tee iterator in one thread and using it the other thread. Or using the tee iterators with external locking. I afraid that PR 15567 can break a legitimate code. |
Go ahead and take this in any direction you want. |
As far as I am concerned, I prefer that like PR 9254 can be merged into old versions. Because it does not break the legitimate code and can prevent the program from crashing. It can be used as a compatible solution with old versions. |
+1 on this. I think it's better to also apply bpo-15625 to master branch. I don't think we should prevent creating the iterator in one thread and using it in others. |
This seems resolved, can it be closed? |
The script (3.py) now gives the RuntimeError (as of 3.9) so I think the discussion about back port is over and there is nothing more to do on this issue. If nobody objects I will close this. |
It would be nice to add a multi-thread supporting version of tee() or add multi-thread support in tee(), but this is different issue. |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: