Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio.create_task weakrefset race condition #89125

Closed
graingert mannequin opened this issue Aug 20, 2021 · 9 comments
Closed

asyncio.create_task weakrefset race condition #89125

graingert mannequin opened this issue Aug 20, 2021 · 9 comments
Labels
3.9 only security fixes 3.10 only security fixes 3.11 only security fixes topic-asyncio

Comments

@graingert
Copy link
Mannequin

graingert mannequin commented Aug 20, 2021

BPO 44962
Nosy @pitrou, @asvetlov, @ambv, @1st1, @graingert, @miss-islington, @bensimner
PRs
  • bpo-44962: protect asyncio.tasks._all_tasks WeakSet with a lock #27909
  • bpo-44962 fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal #27921
  • [3.10] bpo-44962: Fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal (GH-27921) #28013
  • [3.9] bpo-44962: Fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal (GH-27921) #28014
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = <Date 2021-08-28.18:55:38.459>
    created_at = <Date 2021-08-20.14:07:52.012>
    labels = ['3.11', '3.9', '3.10', 'expert-asyncio']
    title = 'asyncio.create_task weakrefset race condition'
    updated_at = <Date 2021-08-28.18:55:38.458>
    user = 'https://github.com/graingert'

    bugs.python.org fields:

    activity = <Date 2021-08-28.18:55:38.458>
    actor = 'lukasz.langa'
    assignee = 'none'
    closed = True
    closed_date = <Date 2021-08-28.18:55:38.459>
    closer = 'lukasz.langa'
    components = ['asyncio']
    creation = <Date 2021-08-20.14:07:52.012>
    creator = 'graingert'
    dependencies = []
    files = []
    hgrepos = []
    issue_num = 44962
    keywords = ['patch']
    message_count = 9.0
    messages = ['399969', '399971', '399972', '399973', '400175', '400483', '400488', '400494', '400495']
    nosy_count = 7.0
    nosy_names = ['pitrou', 'asvetlov', 'lukasz.langa', 'yselivanov', 'graingert', 'miss-islington', 'bjs']
    pr_nums = ['27909', '27921', '28013', '28014']
    priority = 'normal'
    resolution = 'fixed'
    stage = 'resolved'
    status = 'closed'
    superseder = None
    type = None
    url = 'https://bugs.python.org/issue44962'
    versions = ['Python 3.9', 'Python 3.10', 'Python 3.11']

    @graingert
    Copy link
    Mannequin Author

    graingert mannequin commented Aug 20, 2021

    with the following demo script I can get a IndexError: pop from empty list
    
    import itertools
    import asyncio
    import concurrent.futures
    import sys
    import threading
    
    threads = 200
    
    def test_all_tasks_threading() -> None:
        async def foo() -> None:
            await asyncio.sleep(0)
    
        async def create_tasks() -> None:
            for i in range(1000):
                asyncio.create_task(foo())
    
            await asyncio.sleep(0)
    
        results = []
        with concurrent.futures.ThreadPoolExecutor(threads) as tpe:
            for f in concurrent.futures.as_completed(
                tpe.submit(asyncio.run, create_tasks()) for i in range(threads)
            ):
                results.append(f.result())
        assert results == [None] * threads
    
    
    def main():
        for i in itertools.count():
            test_all_tasks_threading()
            print(f"worked {i}")
        return 0
    
    
    if __name__ == "__main__":
        sys.exit(main())
    
    worked 0
    worked 1
    worked 2
    worked 3
    worked 4
    worked 5
    worked 6
    worked 7
    worked 8
    worked 9
    worked 10
    worked 11
    worked 12
    worked 13
    worked 14
    worked 15
    worked 16
    worked 17
    worked 18
    Traceback (most recent call last):
      File "/home/graingert/projects/asyncio-demo/demo.py", line 36, in <module>
        sys.exit(main())
      File "/home/graingert/projects/asyncio-demo/demo.py", line 30, in main
        test_all_tasks_threading()
      File "/home/graingert/projects/asyncio-demo/demo.py", line 24, in test_all_tasks_threading
        results.append(f.result())
      File "/usr/lib/python3.9/concurrent/futures/_base.py", line 438, in result
        return self.__get_result()
      File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
        raise self._exception
      File "/usr/lib/python3.9/concurrent/futures/thread.py", line 52, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/usr/lib/python3.9/asyncio/runners.py", line 48, in run
        loop.run_until_complete(loop.shutdown_asyncgens())
      File "/usr/lib/python3.9/asyncio/base_events.py", line 621, in run_until_complete
        future = tasks.ensure_future(future, loop=self)
      File "/usr/lib/python3.9/asyncio/tasks.py", line 667, in ensure_future
        task = loop.create_task(coro_or_future)
      File "/usr/lib/python3.9/asyncio/base_events.py", line 433, in create_task
        task = tasks.Task(coro, loop=self, name=name)
      File "/usr/lib/python3.9/_weakrefset.py", line 84, in add
        self._commit_removals()
      File "/usr/lib/python3.9/_weakrefset.py", line 57, in _commit_removals
        discard(l.pop())
    IndexError: pop from empty list
    sys:1: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited
    Task was destroyed but it is pending!
    task: <Task pending name='Task-74178' coro=<BaseEventLoop.shutdown_asyncgens() running at /usr/lib/python3.9/asyncio/base_events.py:530>>
    

    here's a live demo on github actions: https://github.com/graingert/asyncio-backport/runs/3380502247#step:5:90

    @graingert graingert mannequin added 3.8 only security fixes 3.9 only security fixes topic-asyncio 3.7 (EOL) end of life 3.10 only security fixes labels Aug 20, 2021
    @graingert
    Copy link
    Mannequin Author

    graingert mannequin commented Aug 20, 2021

    still happens on 3.10 even though there's an with _IterationGuard

    worked 0
    Traceback (most recent call last):
      File "/home/graingert/projects/asyncio-demo/demo.py", line 36, in <module>
        sys.exit(main())
      File "/home/graingert/projects/asyncio-demo/demo.py", line 30, in main
        test_all_tasks_threading()
      File "/home/graingert/projects/asyncio-demo/demo.py", line 24, in test_all_tasks_threading
        results.append(f.result())
      File "/usr/lib/python3.10/concurrent/futures/_base.py", line 438, in result
        return self.__get_result()
      File "/usr/lib/python3.10/concurrent/futures/_base.py", line 390, in __get_result
        raise self._exception
      File "/usr/lib/python3.10/concurrent/futures/thread.py", line 52, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/usr/lib/python3.10/asyncio/runners.py", line 47, in run
        _cancel_all_tasks(loop)
      File "/usr/lib/python3.10/asyncio/runners.py", line 56, in _cancel_all_tasks
        to_cancel = tasks.all_tasks(loop)
      File "/usr/lib/python3.10/asyncio/tasks.py", line 53, in all_tasks
        tasks = list(_all_tasks)
      File "/usr/lib/python3.10/_weakrefset.py", line 60, in __iter__
        with _IterationGuard(self):
      File "/usr/lib/python3.10/_weakrefset.py", line 33, in __exit__
        w._commit_removals()
      File "/usr/lib/python3.10/_weakrefset.py", line 57, in _commit_removals
        discard(l.pop())
    IndexError: pop from empty list
    

    @graingert
    Copy link
    Mannequin Author

    graingert mannequin commented Aug 20, 2021

    interestingly 3.10 didn't show:

    sys:1: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited

    @bensimner
    Copy link
    Mannequin

    bensimner mannequin commented Aug 20, 2021

    I can reproduce on 3.9.6

    A little digging and it seems asyncio imports Task from _asyncio
    and _asyncio's implementation (in asynciomodule.c) of Task has an init which adds the task to the all_tasks weakref.WeakSet
    which appears to be implemented in Python (in Lib/_weakrefset.py)

    weakref.WeakSet is not thread-safe, which means concurrent create_task's in different threads (even on separate event loops) is not safe.

    @graingert
    Copy link
    Mannequin Author

    graingert mannequin commented Aug 23, 2021

    weakref.WeakSet is not thread-safe, which means concurrent create_task's in different threads (even on separate event loops) is not safe.

    actually it looks like WeakSet is *supposed* to be thread-safe

    import patchy
    
    patchy.patch(
        "weakref:WeakSet._commit_removals",
        """\
        @@ -1,5 +1,10 @@
         def _commit_removals(self):
        -    l = self._pending_removals
        +    pop = self._pending_removals.pop
             discard = self.data.discard
        -    while l:
        -        discard(l.pop())
        +    while True:
        +        try:
        +            item = pop()
        +        except IndexError:
        +            return
        +        else:
        +            discard(item)
        """
    )
    
    import itertools
    import asyncio
    import concurrent.futures
    import sys
    import threading
    
    threads = 200
    
    def test_all_tasks_threading() -> None:
        async def foo() -> None:
            await asyncio.sleep(0)
    
        async def create_tasks() -> None:
            for i in range(1000):
                asyncio.create_task(foo())
    
            await asyncio.sleep(0)
    
        results = []
        with concurrent.futures.ThreadPoolExecutor(threads) as tpe:
            for f in concurrent.futures.as_completed(
                tpe.submit(asyncio.run, create_tasks()) for i in range(threads)
            ):
                results.append(f.result())
        assert results == [None] * threads
    
    
    def main():
        for i in itertools.count():
            test_all_tasks_threading()
            print(f"worked {i}")
        return 0
    
    
    if __name__ == "__main__":
        sys.exit(main())
    

    @ambv
    Copy link
    Contributor

    ambv commented Aug 28, 2021

    New changeset 206b21e by Thomas Grainger in branch 'main':
    bpo-44962: Fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal (GH-27921)
    206b21e

    @miss-islington
    Copy link
    Contributor

    New changeset 8aa64cc by Miss Islington (bot) in branch '3.10':
    bpo-44962: Fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal (GH-27921)
    8aa64cc

    @ambv
    Copy link
    Contributor

    ambv commented Aug 28, 2021

    New changeset 166ad70 by Miss Islington (bot) in branch '3.9':
    bpo-44962: Fix a race in WeakKeyDict, WeakValueDict and WeakSet when two threads attempt to commit the last pending removal (GH-27921) (GH-28014)
    166ad70

    @ambv
    Copy link
    Contributor

    ambv commented Aug 28, 2021

    Thanks, Thomas! ✨ 🍰 ✨

    @ambv ambv added 3.11 only security fixes and removed 3.7 (EOL) end of life 3.8 only security fixes labels Aug 28, 2021
    @ambv ambv closed this as completed Aug 28, 2021
    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    3.9 only security fixes 3.10 only security fixes 3.11 only security fixes topic-asyncio
    Projects
    None yet
    Development

    No branches or pull requests

    2 participants