This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: itertools.tee not thread-safe; can segfault interpreter when wrapped iterator releases GIL
Type: crash Stage: resolved
Components: Library (Lib) Versions: Python 3.9, Python 3.8, Python 3.7, Python 2.7
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: serhiy.storchaka Nosy List: carlorosati, davin, hongweipeng, iritkatriel, josh.r, miss-islington, rhettinger, serhiy.storchaka, xiang.zhang, xtreak
Priority: normal Keywords: patch

Created on 2018-08-15 02:08 by carlorosati, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Files
File name Uploaded Description Edit
3.py carlorosati, 2018-08-15 02:08
Pull Requests
URL Status Linked Edit
PR 9075 closed hongweipeng, 2018-09-06 03:31
PR 9254 closed hongweipeng, 2018-09-13 02:07
PR 15567 merged hongweipeng, 2019-08-29 02:32
PR 15625 merged serhiy.storchaka, 2019-08-30 22:56
PR 15736 merged serhiy.storchaka, 2019-09-09 07:37
PR 15737 merged miss-islington, 2019-09-09 08:47
PR 15738 merged miss-islington, 2019-09-09 08:47
PR 15740 merged serhiy.storchaka, 2019-09-09 09:06
Messages (31)
msg323546 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-15 02:08
Hello, 

When I run the attached code, I encounter a segmentation fault.

Thanks,
Carlo
msg323587 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-16 01:36
I figured out that the problem is itertools.tee does not use a multiprocessing.Manager proxied object for shared state. I was able to create a workaround tee as follows.

def multiprocessing_tee(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lists = [m.list() for i in range(n)]
    def gen(local_list):
        keep_m_alive = m
        while True:
            if not local_list:         # when the local list is empty
                newval = next(it)      # fetch a new value and
                for l in lists:        # load it to all the lists
                    l.append(newval)
            yield local_list.pop(-1)
    return tuple(gen(l) for l in lists)
msg323588 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-16 01:50
Okay I needed to do .pop(0) instead of .pop(-1) which is probably O(N)
msg323592 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-16 05:38
You'll also need to lock when modifying the manager's list. Does anyone know how to do this using the multiprocessing.Queues without deadlocking?
msg323593 - (view) Author: Karthikeyan Singaravelan (xtreak) * (Python committer) Date: 2018-08-16 05:40
Thanks for the script. I can reproduce this on master and Python 3.6 too. Sometimes the attached script causes timeout error. Running it under gdb gives me below : 

[New Thread 0x18ab of process 10682]
[New Thread 0x1903 of process 10682]
[New Thread 0x1a03 of process 10682]

Thread 2 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x18ab of process 10682]
0x000000010033e509 in teedataobject_getitem (tdo=0x1014361c0, i=1)
    at ./Modules/itertoolsmodule.c:454
454	    Py_INCREF(value);


Backtrace : 

#0  0x000000010033e509 in teedataobject_getitem (tdo=0x100645640, i=1)
    at ./Modules/itertoolsmodule.c:454
#1  0x000000010033e290 in tee_next (to=0x10308a668) at ./Modules/itertoolsmodule.c:637
#2  0x0000000100059844 in enum_next (en=0x10308fc20) at Objects/enumobject.c:156
#3  0x00000001002297e1 in _PyEval_EvalFrameDefault (
    f=Frame 0x100646710, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py, line 292, in _guarded_task_generation (self=<Pool(_ctx=<ForkContext at remote 0x101ddce90>, _inqueue=<SimpleQueue(_reader=<Connection(_handle=4, _readable=True, _writable=False) at remote 0x101c236b0>, _writer=<Connection(_handle=5, _readable=False, _writable=True) at remote 0x101c23c50>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x101f53e10>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x101f53e10>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x101f53e10>) at remote 0x101f50050>, _poll=<method at remote 0x10186e2c0>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078178>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078178>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078178>) at remote 0x101f54dd0>) at remote 0x101bf5cb0>, _ou...(truncated), throwflag=0) at Python/ceval.c:2905
#4  0x0000000100215277 in PyEval_EvalFrameEx (
    f=Frame 0x100646710, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py, line 292, in _guarded_task_generation (self=<Pool(_ctx=<ForkContext at remote 0x101ddce90>, _inqueue=<SimpleQueue(_reader=<Connection(_handle=4, _readable=True, _writable=False) at remote 0x101c236b0>, _writer=<Connection(_handle=5, _readable=False, _writable=True) at remote 0x101c23c50>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x101f53e10>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x101f53e10>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x101f53e10>) at remote 0x101f50050>, _poll=<method at remote 0x10186e2c0>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078178>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078178>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078178>) at remote 0x101f54dd0>) at remote 0x101bf5cb0>, _ou...(truncated), throwflag=0) at Python/ceval.c:536
#5  0x000000010006a3e2 in gen_send_ex (gen=0x1030349b0, arg=0x0, exc=0, closing=0)
    at Objects/genobject.c:221
#6  0x000000010006ba1f in gen_iternext (gen=0x1030349b0) at Objects/genobject.c:542
#7  0x00000001002297e1 in _PyEval_EvalFrameDefault (
    f=Frame 0x101909730, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py, line 426, in _handle_tasks (taskqueue=<_queue.SimpleQueue at remote 0x101f1daa0>, put=<method at remote 0x101db5ae0>, outqueue=<SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078c08>) at remote 0x10306df50>) a...(truncated), throwflag=0) at Python/ceval.c:2905
#8  0x0000000100215277 in PyEval_EvalFrameEx (
    f=Frame 0x101909730, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py, line 426, in _handle_tasks (taskqueue=<_queue.SimpleQueue at remote 0x101f1daa0>, put=<method at remote 0x101db5ae0>, outqueue=<SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, ac---Type <return> to continue, or q <return> to quit---
quire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078c08>) at remote 0x10306df50>) a...(truncated), throwflag=0) at Python/ceval.c:536
#9  0x0000000100045d63 in function_code_fastcall (co=0x101de77c0, args=0x1030693a0, nargs=5,
    globals={'__name__': 'multiprocessing.pool', '__doc__': None, '__package__': 'multiprocessing', '__loader__': <SourceFileLoader(name='multiprocessing.pool', path='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py') at remote 0x101c23b90>, '__spec__': <ModuleSpec(name='multiprocessing.pool', loader=<...>, origin='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py', loader_state=None, submodule_search_locations=None, _set_fileattr=True, _cached='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/__pycache__/pool.cpython-38.pyc', _initializing=False) at remote 0x101c23bf0>, '__file__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/pool.py', '__cached__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/multiprocessing/__pycache__/pool.cpython-38.pyc', '__builtins__': {'__name__': 'builtins', '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis repr...(truncated)) at Objects/call.c:283
#10 0x0000000100043d91 in _PyFunction_FastCallDict (func=<function at remote 0x101f41f70>,
    args=0x103069378, nargs=5, kwargs={}) at Objects/call.c:322
#11 0x0000000100045902 in PyObject_Call (callable=<function at remote 0x101f41f70>,
    args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078c08>) at remote 0x10306df50>) at remote 0x101f50530>, [<ForkProcess(_identity=(1,), _config={'authkey': <AuthenticationString at remote 0x101bfcdd0>, 'semprefix': '/mp', 'daemon': True}, _pa...(truncated), kwargs={}) at Objects/call.c:226
#12 0x00000001002342bc in do_call_core (func=<function at remote 0x101f41f70>,
    callargs=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078c08>) at remote 0x10306df50>) at remote 0x101f50530>, [<ForkProcess(_identity=(1,), _config={'authkey': <AuthenticationString at remote 0x101bfcdd0>, 'semprefix': '/mp', 'daemon': True}, _pa...(truncated), kwdict={}) at Python/ceval.c:4652
#13 0x000000010022ca2b in _PyEval_EvalFrameDefault (
    f=Frame 0x10308d050, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 865, in run (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_hand---Type <return> to continue, or q <return> to quit---
le=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x1...(truncated), throwflag=0) at Python/ceval.c:3267
#14 0x0000000100215277 in PyEval_EvalFrameEx (
    f=Frame 0x10308d050, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 865, in run (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x1...(truncated), throwflag=0) at Python/ceval.c:536
#15 0x0000000100045d63 in function_code_fastcall (co=0x101c7aa00, args=0x10190a8f8, nargs=1,
    globals={'__name__': 'threading', '__doc__': "Thread module emulating a subset of Java's threading model.", '__package__': '', '__loader__': <SourceFileLoader(name='threading', path='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py') at remote 0x101c6a4d0>, '__spec__': <ModuleSpec(name='threading', loader=<...>, origin='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', loader_state=None, submodule_search_locations=None, _set_fileattr=True, _cached='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', _initializing=False) at remote 0x101c6a530>, '__file__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', '__cached__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', '__builtins__': {'__name__': 'builtins', '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis represents `...' in slices.", '__package__': '', '...(truncated)) at Objects/call.c:283
#16 0x0000000100045069 in _PyFunction_FastCallKeywords (func=<function at remote 0x101d9e890>,
    stack=0x10190a8f0, nargs=1, kwnames=0x0) at Objects/call.c:408
#17 0x0000000100233e50 in call_function (pp_stack=0x103b522b8, oparg=1, kwnames=0x0)
    at Python/ceval.c:4623
#18 0x000000010022bcf9 in _PyEval_EvalFrameDefault (
    f=Frame 0x10190a750, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 917, in _bootstrap_inner (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object ...(truncated), throwflag=0) at Python/ceval.c:3186
#19 0x0000000100215277 in PyEval_EvalFrameEx (
    f=Frame 0x10190a750, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.p---Type <return> to continue, or q <return> to quit---
y, line 917, in _bootstrap_inner (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object ...(truncated), throwflag=0) at Python/ceval.c:536
#20 0x0000000100045d63 in function_code_fastcall (co=0x101c7ad00, args=0x103086b68, nargs=1,
    globals={'__name__': 'threading', '__doc__': "Thread module emulating a subset of Java's threading model.", '__package__': '', '__loader__': <SourceFileLoader(name='threading', path='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py') at remote 0x101c6a4d0>, '__spec__': <ModuleSpec(name='threading', loader=<...>, origin='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', loader_state=None, submodule_search_locations=None, _set_fileattr=True, _cached='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', _initializing=False) at remote 0x101c6a530>, '__file__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', '__cached__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', '__builtins__': {'__name__': 'builtins', '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis represents `...' in slices.", '__package__': '', '...(truncated)) at Objects/call.c:283
#21 0x0000000100045069 in _PyFunction_FastCallKeywords (func=<function at remote 0x101d9eb50>,
    stack=0x103086b60, nargs=1, kwnames=0x0) at Objects/call.c:408
#22 0x0000000100233e50 in call_function (pp_stack=0x103b54508, oparg=1, kwnames=0x0)
    at Python/ceval.c:4623
#23 0x000000010022bcf9 in _PyEval_EvalFrameDefault (
    f=Frame 0x1030869d8, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 885, in _bootstrap (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at rem...(truncated), throwflag=0) at Python/ceval.c:3186
#24 0x0000000100215277 in PyEval_EvalFrameEx (
    f=Frame 0x1030869d8, for file /Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py, line 885, in _bootstrap (self=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.S---Type <return> to continue, or q <return> to quit---
emLock object at rem...(truncated), throwflag=0) at Python/ceval.c:536
#25 0x0000000100045d63 in function_code_fastcall (co=0x101c7aac0, args=0x103b55d68, nargs=1,
    globals={'__name__': 'threading', '__doc__': "Thread module emulating a subset of Java's threading model.", '__package__': '', '__loader__': <SourceFileLoader(name='threading', path='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py') at remote 0x101c6a4d0>, '__spec__': <ModuleSpec(name='threading', loader=<...>, origin='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', loader_state=None, submodule_search_locations=None, _set_fileattr=True, _cached='/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', _initializing=False) at remote 0x101c6a530>, '__file__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/threading.py', '__cached__': '/Users/karthikeyansingaravelan/stuff/python/cpython/Lib/__pycache__/threading.cpython-38.pyc', '__builtins__': {'__name__': 'builtins', '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis represents `...' in slices.", '__package__': '', '...(truncated)) at Objects/call.c:283
#26 0x0000000100043d91 in _PyFunction_FastCallDict (func=<function at remote 0x101d9e940>,
    args=0x103b55d60, nargs=1, kwargs=0x0) at Objects/call.c:322
#27 0x00000001000437e4 in _PyObject_FastCallDict (callable=<function at remote 0x101d9e940>,
    args=0x103b55d60, nargs=1, kwargs=0x0) at Objects/call.c:98
#28 0x0000000100047d29 in _PyObject_Call_Prepend (callable=<function at remote 0x101d9e940>,
    obj=<Thread(_target=<function at remote 0x101f41f70>, _name='Thread-2', _args=(<_queue.SimpleQueue at remote 0x101f1daa0>, <method at remote 0x101db5ae0>, <SimpleQueue(_reader=<Connection(_handle=8, _readable=True, _writable=False) at remote 0x10306d410>, _writer=<Connection(_handle=9, _readable=False, _writable=True) at remote 0x10306d9b0>, _rlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078ba0>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078ba0>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078ba0>) at remote 0x10306de90>, _poll=<method at remote 0x1018eddb8>, _wlock=<Lock(_semlock=<_multiprocessing.SemLock at remote 0x103078c08>, acquire=<built-in method acquire of _multiprocessing.SemLock object at remote 0x103078c08>, release=<built-in method release of _multiprocessing.SemLock object at remote 0x103078c08>) at remote 0x10306df50>) at remote 0x101f50530>, [<ForkProcess(_identity=(1,), _config={'authkey': <Authenticat...(truncated), args=(), kwargs=0x0) at Objects/call.c:904
#29 0x000000010004b5bd in method_call (method=<method at remote 0x101ec2bb0>, args=(), kwargs=0x0)
    at Objects/classobject.c:306
#30 0x0000000100045a02 in PyObject_Call (callable=<method at remote 0x101ec2bb0>, args=(),
    kwargs=0x0) at Objects/call.c:245
#31 0x0000000100349e16 in t_bootstrap (boot_raw=0x10303dd30) at ./Modules/_threadmodule.c:992
#32 0x00007fff87a53268 in _pthread_body () from /usr/lib/system/libsystem_pthread.dylib
#33 0x00007fff87a531e5 in _pthread_start () from /usr/lib/system/libsystem_pthread.dylib
#34 0x00007fff87a5141d in thread_start () from /usr/lib/system/libsystem_pthread.dylib
#35 0x0000000000000000 in ?? ()



Thanks
msg323613 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2018-08-16 18:27
Davin, is there anything itertools.tee() can do about this or is this a multiprocessing issue?
msg323632 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-17 02:18
I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue.

The first one I wrote uses a managed list. 

def multiprocessing_tee(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    mylock = m.Lock()
    lists = [m.list() for i in range(n)]
    def gen(local_list):
        for i in itertools.count():
            with mylock:
                if not local_list:         # when the local list is empty
                    newval = next(it)      # fetch a new value and
                    for l in lists:        # load it to all the lists
                        l.append(newval)
            yield local_list.pop(0)
    return tuple(gen(l) for l in lists)

The second two implementations use queues. 

def multiprocessing_tee_q(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
    def gen(myqueue):
        while True:
            with lock: # no one else touches anything
                try:
                    newval = myqueue.get_nowait()
                except Queue.Empty:
                    newval = next(it)
                    for q in queues:
                        q.put(newval)
                    newval = myqueue.get()
            yield newval
    return tuple(gen(q) for q in queues)

class Sentinel(object):
    """used as Queue Sentinel"""

def multiprocessing_tee_q2(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
    def gen(myqueue):
        while True:
            try:
                retval = myqueue.get_nowait()
            except Queue.Empty:
                # what happens if the other process puts last item in my queue before i get lock?
                with lock: # no one else touches anything
                    try:
                        newval = next(it)
                    except StopIteration:
                        newval = Sentinel
                    for q in queues:
                        q.put(newval)
                retval = myqueue.get()
            if retval is Sentinel:
                raise StopIteration
            yield retval
    return tuple(gen(q) for q in queues)

I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations.
msg323633 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-17 02:21
`for i in itertools.count()` in the first implementation I posted should be `while True`. I was using that for debugging.
msg323817 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2018-08-21 03:48
It seems to me the problem is tee objects might encounter race conditions while  `PyIter_Next` in `teedataobject_getitem` releases GIL. Other threads then might get into the same branch since `tdo->numread` haven't been updated yet. NULL slots are generated then, 2 objects are read from the underlying iterator and `tdo->numread` is updated twice while only one slot is set.

As for multiprocessing.pool, there is a background task handling thread consuming one tee object and main thread consuming another one. The underlying iterator is `IMapIterator` which `next` method would block on a condition.

While trying, I find the following snippet would also crash:

import threading
import itertools

class C:
    def __iter__(self):
        return self
    def __next__(self):
        return 1

def test(i):
    print(list(i))

i1, i2 = itertools.tee(C())
threading.Thread(target=test, args=(i1,)).start()
print(list(i2))

GDB shows it crashs in `teedataobject_dealloc` -> `teedataobject_clear`. I haven't understood what happened.
msg323828 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2018-08-21 09:18
Ahh, the infinite iterator could also releases GIL in `PyIter_Next`.
msg323858 - (view) Author: Carlo Rosati (carlorosati) Date: 2018-08-22 01:35
If what you've said is correct, would it make the most sense to create a Manager method which returns a Proxy to a tee'd iterator?
msg323881 - (view) Author: Josh Rosenberg (josh.r) * (Python triager) Date: 2018-08-22 14:10
Carlo: The point of Xiang's post is that this is only tangentially related to multiprocessing; the real problem is that tee-ing an iterator implemented in Python (of which pool.imap_unordered is just one example) and using the resulting tee-ed iterators in multiple threads (which pool.imap_unordered does implicitly, as there is a thread involved in dispatching work).

The problem is *exposed* by multiprocessing.pool.imap_unordered, but it entirely a problem with itertools.tee, and as Xiang's repro indicates, it can be triggered easily without the complexity of multiprocessing being involved.

I've updated the bug title to reflect this.
msg323917 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2018-08-23 03:14
Xiang Zhang, would you like to submit a patch?
msg324146 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2018-08-27 06:37
I could. But currently I don't have a good idea how to fix it elegantly. If anyone else makes a PR, I'm willing to review it.
msg324237 - (view) Author: Karthikeyan Singaravelan (xtreak) * (Python committer) Date: 2018-08-28 11:00
This might be redundant but I googled 'itertools tee thread safe' and came across a detailed SO answer from 2017 that explains the issue along with a similar example that causes segfault in the compiler but it was not reported here it seems. It also proposes a possible solution to fix the iterator but not tee itself. 

SO answer : https://stackoverflow.com/a/44638570/2610955

Similar open issue that could be a duplicate 

https://bugs.python.org/issue24482


Thanks
msg324658 - (view) Author: hongweipeng (hongweipeng) * Date: 2018-09-06 03:34
Multi-process need uses multiprocessing.Manager to share, the current problem should be tee-objcet thread safety issue.As Xiang Zhang said,`PyIter_Next` in `teedataobject_getitem` releases GIL.So the thread lock is necessary,and only lead iterator uses it when runs `PyIter_Next`.Can anyone help me review it.
msg324839 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2018-09-08 13:42
I take this.
msg350165 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2019-08-22 06:45
I tried to solve this issue myself, and figured out that it is not so simple. It is possible to make tee() nor crashing, but it is not possible to guarantee the order of the output without using a lock in tee(). It you can get a sequence 1, 2, 4, 3, ...

So we have the following options:

1. Document that the tee() iterator can not be consumed from different threads, even if an underlying iterator is thread-safe.
2. Detect the race condition and raise a RuntimeError (as for iterating modifying dicts).
3. Fix a crash, but document that the tee() iterator can return items out of order if consumed from different threads (PR 9254 basically does this). And exceptions can be raised out of order too.
4. Add a non-reentrant lock in the tee() object and raise a RuntimeError if it is re-entered.
5. Add a reentrant lock in the tee() object.

We can apply different solutions in developed and maintained versions. I would not backport options with locks.
msg350174 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2019-08-22 08:36
Thanks for enumerating the options.  I think 1 and 2 are the best combination.  It is a reasonable restriction to not tee across threads.  If someone still does, then detecting it, raising an exception, and not crashing seem like a reasonable response.
msg350705 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2019-08-29 03:43
I've applied the patch to "master".  I considered backporting but am thinking that would be risky at this stage in the 3.8 release.
msg350750 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2019-08-29 08:11
The documentation changes should be backported.

And I think we need a change like PR 9254, but with raising a RuntimeError instead of adding the value to the queue, to be applied in older versions.

It may be better to apply it even to the developed version. There is nothing wrong with creating the tee iterator in one thread and using it the other thread. Or using the tee iterators with external locking. I afraid that PR 15567 can break a legitimate code.
msg350754 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2019-08-29 08:25
Go ahead and take this in any direction you want.
msg350907 - (view) Author: hongweipeng (hongweipeng) * Date: 2019-08-31 02:57
As far as I am concerned, I prefer that like PR 9254 can be merged into old versions. Because it does not break the legitimate code and can prevent the program from crashing. It can be used as a compatible solution with old versions.
msg351014 - (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2019-09-02 15:26
>It may be better to apply it even to the developed version. There is nothing wrong with creating the tee iterator in one thread and using it the other thread. Or using the tee iterators with external locking. I afraid that PR 15567 can break a legitimate code.

+1 on this. I think it's better to also apply #15625 to master branch. I don't think we should prevent creating the iterator in one thread and using it in others.
msg351360 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2019-09-09 08:47
New changeset 526a01467b3277f9fcf7f91e66c23321caa1245d by Serhiy Storchaka in branch 'master':
bpo-34410: Fix a crash in the tee iterator when re-enter it. (GH-15625)
https://github.com/python/cpython/commit/526a01467b3277f9fcf7f91e66c23321caa1245d
msg351370 - (view) Author: miss-islington (miss-islington) Date: 2019-09-09 09:07
New changeset 6e3809c7ce9fbee11c3a3f89dd7e89829b7581ac by Miss Islington (bot) in branch '3.8':
bpo-34410: Fix a crash in the tee iterator when re-enter it. (GH-15625)
https://github.com/python/cpython/commit/6e3809c7ce9fbee11c3a3f89dd7e89829b7581ac
msg351371 - (view) Author: miss-islington (miss-islington) Date: 2019-09-09 09:11
New changeset 5190b7193c184268d5c8a9440b3a5a8bcd84a23e by Miss Islington (bot) in branch '3.7':
bpo-34410: Fix a crash in the tee iterator when re-enter it. (GH-15625)
https://github.com/python/cpython/commit/5190b7193c184268d5c8a9440b3a5a8bcd84a23e
msg351381 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2019-09-09 09:38
New changeset 2fb6921ab296f933caf361a662e6471e143abefc by Serhiy Storchaka in branch '2.7':
[2.7] bpo-34410: Fix a crash in the tee iterator when re-enter it. (GH-15625) (GH-15740)
https://github.com/python/cpython/commit/2fb6921ab296f933caf361a662e6471e143abefc
msg375640 - (view) Author: Irit Katriel (iritkatriel) * (Python committer) Date: 2020-08-19 09:02
This seems resolved, can it be closed?
msg398264 - (view) Author: Irit Katriel (iritkatriel) * (Python committer) Date: 2021-07-26 22:32
The script (3.py) now gives the RuntimeError (as of 3.9) so I think the discussion about back port is over and there is nothing more to do on this issue.  If nobody objects I will close this.
msg398282 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2021-07-27 07:17
It would be nice to add a multi-thread supporting version of tee() or add multi-thread support in tee(), but this is different issue.
History
Date User Action Args
2022-04-11 14:59:04adminsetgithub: 78591
2021-07-27 07:17:31serhiy.storchakasetstatus: pending -> closed

messages: + msg398282
stage: patch review -> resolved
2021-07-26 22:32:57iritkatrielsetstatus: open -> pending

messages: + msg398264
2020-08-19 09:02:47iritkatrielsetnosy: + iritkatriel
messages: + msg375640
2019-09-09 09:38:08serhiy.storchakasetmessages: + msg351381
2019-09-09 09:11:25miss-islingtonsetmessages: + msg351371
2019-09-09 09:07:54miss-islingtonsetnosy: + miss-islington
messages: + msg351370
2019-09-09 09:06:51serhiy.storchakasetpull_requests: + pull_request15394
2019-09-09 08:47:44miss-islingtonsetpull_requests: + pull_request15393
2019-09-09 08:47:36miss-islingtonsetpull_requests: + pull_request15392
2019-09-09 08:47:17serhiy.storchakasetmessages: + msg351360
2019-09-09 07:37:09serhiy.storchakasetpull_requests: + pull_request15391
2019-09-02 15:26:10xiang.zhangsetmessages: + msg351014
2019-08-31 02:57:19hongweipengsetmessages: + msg350907
2019-08-30 22:56:05serhiy.storchakasetstage: needs patch -> patch review
pull_requests: + pull_request15293
2019-08-29 08:25:52rhettingersetmessages: + msg350754
2019-08-29 08:11:26serhiy.storchakasetstatus: closed -> open

stage: resolved -> needs patch
messages: + msg350750
versions: + Python 2.7, Python 3.7, Python 3.8
2019-08-29 03:43:14rhettingersetstatus: open -> closed
versions: + Python 3.9, - Python 2.7, Python 3.6, Python 3.7, Python 3.8
messages: + msg350705

resolution: fixed
stage: patch review -> resolved
2019-08-29 02:32:44hongweipengsetpull_requests: + pull_request15243
2019-08-22 08:36:56rhettingersetmessages: + msg350174
2019-08-22 06:45:18serhiy.storchakasetmessages: + msg350165
2018-09-13 02:07:42hongweipengsetpull_requests: + pull_request8685
2018-09-08 13:42:03serhiy.storchakasetassignee: serhiy.storchaka

messages: + msg324839
nosy: + serhiy.storchaka
2018-09-06 03:34:22hongweipengsetnosy: + hongweipeng
messages: + msg324658
2018-09-06 03:31:54hongweipengsetkeywords: + patch
stage: needs patch -> patch review
pull_requests: + pull_request8533
2018-08-28 11:12:47xiang.zhanglinkissue24482 superseder
2018-08-28 11:00:13xtreaksetmessages: + msg324237
2018-08-27 06:37:27xiang.zhangsetstage: needs patch
messages: + msg324146
versions: + Python 3.8
2018-08-23 03:14:39rhettingersetmessages: + msg323917
2018-08-22 14:10:08josh.rsetnosy: + josh.r
versions: + Python 3.6
messages: + msg323881

components: + Library (Lib)
title: Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered -> itertools.tee not thread-safe; can segfault interpreter when wrapped iterator releases GIL
2018-08-22 01:35:46carlorosatisetmessages: + msg323858
2018-08-21 09:18:07xiang.zhangsetmessages: + msg323828
2018-08-21 03:48:35xiang.zhangsetmessages: + msg323817
2018-08-20 06:08:53xiang.zhangsetnosy: + xiang.zhang
2018-08-17 02:21:27carlorosatisetmessages: + msg323633
2018-08-17 02:18:16carlorosatisetmessages: + msg323632
2018-08-16 18:27:58rhettingersetnosy: + rhettinger, davin
messages: + msg323613
2018-08-16 05:40:12xtreaksetnosy: + xtreak
messages: + msg323593
2018-08-16 05:38:30carlorosatisetmessages: + msg323592
2018-08-16 01:50:42carlorosatisetmessages: + msg323588
2018-08-16 01:36:44carlorosatisetmessages: + msg323587
2018-08-15 02:08:28carlorosaticreate