Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asyncio.run_in_executor shortcut #76490

Closed
asvetlov opened this issue Dec 13, 2017 · 43 comments
Closed

Implement asyncio.run_in_executor shortcut #76490

asvetlov opened this issue Dec 13, 2017 · 43 comments
Labels
3.9 only security fixes docs Documentation in the Doc dir topic-asyncio

Comments

@asvetlov
Copy link
Contributor

BPO 32309
Nosy @vstinner, @asvetlov, @cjerdonek, @1st1, @miss-islington, @primal100, @aeros
PRs
  • bpo-32311: Implement asyncio.create_task() shortcut #4848
  • bpo-32309: Implement asyncio.ThreadPool #18410
  • bpo-32309: Implement asyncio.to_thread() #20143
  • [3.9] bpo-32309: Implement asyncio.to_thread() (GH-20143) #20212
  • bpo-32309: Add support for contextvars in asyncio.to_thread() #20278
  • [3.9] bpo-32309: Add support for contextvars in asyncio.to_thread() (GH-20278) #20279
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = <Date 2020-05-21.05:58:41.914>
    created_at = <Date 2017-12-13.19:54:21.835>
    labels = ['docs', '3.9', 'expert-asyncio']
    title = 'Implement asyncio.run_in_executor shortcut'
    updated_at = <Date 2020-05-21.05:58:41.913>
    user = 'https://github.com/asvetlov'

    bugs.python.org fields:

    activity = <Date 2020-05-21.05:58:41.913>
    actor = 'aeros'
    assignee = 'none'
    closed = True
    closed_date = <Date 2020-05-21.05:58:41.914>
    closer = 'aeros'
    components = ['Documentation', 'asyncio']
    creation = <Date 2017-12-13.19:54:21.835>
    creator = 'asvetlov'
    dependencies = []
    files = []
    hgrepos = []
    issue_num = 32309
    keywords = ['patch']
    message_count = 43.0
    messages = ['308238', '308239', '308243', '308245', '341352', '355771', '355773', '355796', '355798', '355799', '355800', '355802', '355804', '355805', '355807', '355809', '355811', '355842', '355846', '355852', '355854', '355855', '355857', '355858', '355862', '355863', '355881', '355883', '355938', '355957', '355975', '355978', '356242', '356303', '356776', '356792', '360725', '369318', '369331', '369408', '369493', '369495', '369497']
    nosy_count = 7.0
    nosy_names = ['vstinner', 'asvetlov', 'chris.jerdonek', 'yselivanov', 'miss-islington', 'primal', 'aeros']
    pr_nums = ['4848', '18410', '20143', '20212', '20278', '20279']
    priority = 'normal'
    resolution = 'fixed'
    stage = 'resolved'
    status = 'closed'
    superseder = None
    type = None
    url = 'https://bugs.python.org/issue32309'
    versions = ['Python 3.9']

    @asvetlov
    Copy link
    Contributor Author

    loop.create_task() and loop.run_in_executor are present very often in user code. But they are require a loop instance, actual call looks like

    loop = asyncio.get_running_loop()
    loop.create_task(coro())

    The proposal adds create_task(coro) and run_in_executor(executor, func, *args) shortcuts for this.

    @asvetlov asvetlov added 3.7 (EOL) end of life docs Documentation in the Doc dir topic-asyncio labels Dec 13, 2017
    @1st1
    Copy link
    Member

    1st1 commented Dec 13, 2017

    I don't like the low-level API of run_in_executor. "executor" being the first argument, the inability to pass **kwargs, etc.

    I'd expect to see a more high-level API, perhaps the one that supports 'async with':

        async with asyncio.ThreadPool() as pool:
            f1 = pool.run(func1)
            f2 = pool.run(func2)
            r3 = await pool.run(func3)
    
        r1 = f1.result()
        r2 = f2.result()
        print(r1, r2, r3)

    I mean it's great that we can use 'concurrent.futures' in asyncio, but having native asyncio pools implementation would be way more convenient to the users.

    In any case, can we focus this issue on the "run_in_executor" API, and open a new one for "create_task"?

    @asvetlov
    Copy link
    Contributor Author

    Removed create_task() from title

    @asvetlov asvetlov changed the title Implement asyncio.create_task() and asyncio.run_in_executor shortcuts Implement asyncio.run_in_executor shortcut Dec 13, 2017
    @asvetlov
    Copy link
    Contributor Author

    https://bugs.python.org/issue32311 opened for create_task()

    @asvetlov
    Copy link
    Contributor Author

    asvetlov commented May 3, 2019

    In Python 3.7 loop.run_in_executor() is the only user-faced method that requires a loop.

    asyncio.ThreadPool() sounds great. Maybe thread group can provide better api.

    But for Python 3.8 adding run_in_executor top-level function looks the only easy and obvious way to help people getting rid of explicit loop usage.

    Yuri, what do you think?

    @aeros
    Copy link
    Contributor

    aeros commented Oct 31, 2019

    I don't like the low-level API of run_in_executor. "executor" being the first argument, the inability to pass **kwargs, etc.

    I mean it's great that we can use 'concurrent.futures' in asyncio, but having native asyncio pools implementation would be way more convenient to the users.

    I agree that there's an issue with this, but I think for the high level API it can be an asynchronous context manager that makes use of concurrent.futures.ThreadPoolExecutor, since that one seems to fit the majority of asyncio use cases. If users want to utilize concurrent.futures.ProcessPoolExecutor or a customized one, they can use loop.run_in_executor() instead.

    Unless there's something I'm missing with regards to concurrent.futures.ThreadPoolExecutor that specifically makes it less usable. Can't asyncio.ThreadPool make use of concurrent.futures.ThreadPoolExecutor in the internal details while providing a better API?

    I'd expect to see a more high-level API, perhaps the one that supports 'async with':

    asyncio.ThreadPool() sounds great. Maybe thread group can provide better api.

    I'd be willing to work on implementing the asyncio.ThreadPool API as Yury described it.

    But for Python 3.8 adding run_in_executor top-level function looks the only easy and obvious way to help people getting rid of explicit loop usage.

    I can see your point Andrew, but I think that if we implement asyncio.run_in_executor() and then later add asyncio.ThreadPool(), it's going to be against "one obvious way" and we'll end up adding two high-level functions for the same purpose. This principle can't always be adhered to, but I think we should try to when it's possible.

    Personally, I think the API for asyncio.ThreadPool would be much more robust, so it seems worthwhile to try implementing this one first. If there are a number of unforeseen complications, asyncio.run_in_executor() might still be a decent fallback.

    @aeros
    Copy link
    Contributor

    aeros commented Oct 31, 2019

    end up adding two high-level functions

    Clarification: asyncio.run_in_executor() would be a function, but asyncio.ThreadPool would be a context manager class.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    So, here's a prototype implementation of asyncio.ThreadPool that would function exactly as Yury described, but I'm not convinced about the design. Usually, it seems preferred to separate the context manager from the rest of the class (as was done with asyncio.Lock), but I think this one might be simple enough to be a single class:

    class ThreadPool:
        def __init__(self, timeout=None):
            self.timeout = timeout
            self._loop = None
        
        async def __aenter__(self):
            self._loop = asyncio.get_running_loop()
            # Ensure that ThreadPoolExecutor is being used
            self._loop.default_executor = concurrent.futures.ThreadPoolExecutor()
            return self
    
        async def __aexit__(self, *args):
            await self._loop.shutdown_default_executor(timeout=self.timeout)
    
        def run(self, func, *args, **kwargs):
            call = functools.partial(func, *args, **kwargs)
            return self._loop.run_in_executor(None, call)

    It utilizes the existing lower level event loop API to provide an asynchronous context manager. I'd say the main advantage is that it's significantly more user friendly, as there's no loop or executor argument to be specified, and users can freely pass kwargs to run() thanks to functools.partial().

    Additionally, I also included a timeout parameter for shutting down the ThreadPoolExecutor, which will depend upon #60564. This can be used as such:

    async def main():
        async with asyncio.ThreadPool(timeout=600) as pool:
            fut1 = pool.run(func)
            fut2 = pool.run(func, arg1, arg2)
            fut2 = pool.run(func, arg1, kwarg1=True)
        print(await asyncio.gather(fut1, fut2, fut3))
    
    asyncio.run(main())

    I don't expect that this would be the final design, but I think it's a decent prototype to demonstrate the functionality. Thoughts?

    @asvetlov
    Copy link
    Contributor Author

    asvetlov commented Nov 1, 2019

    run should be awaitable method, see bpo-38430

    @primal100
    Copy link
    Mannequin

    primal100 mannequin commented Nov 1, 2019

    I don't think changing the default executor is a good approach. What happens, if two or more thread pools are running at the same time? In that case they will use the same default executor anyway, so creating a new executor each time seems like a waste.

    Shutting down the default executor seems unnecessary and could impact lower level code which is using it. The default executor is shutdown at the end of asyncio.run anyway.

    I also think it would be good to have a synchronous entry point, and not require a context manager. Having a ThreadPool per class instance would be a common pattern.

    class ThreadPool:
        def __init__(self, timeout=None):
            self.timeout = timeout
            self._loop = asyncio.get_event_loop()
            self._executor = concurrent.futures.ThreadPoolExecutor()
    
        async def close(self): 
            await self._executor.shutdown(timeout=self.timeout)  
            
        async def __aenter__(self):
            return self
    
        async def __aexit__(self, *args):
            await self.close()
    
        def run(self, func, *args, **kwargs):
            call = functools.partial(func, *args, **kwargs)
            return self._loop.run_in_executor(self._executor, call)

    I'm not sure if a new ThreadPoolExecutor really needs to be created for each ThreadPool though.

    @primal100
    Copy link
    Mannequin

    primal100 mannequin commented Nov 1, 2019

    Run method should be:

        async def run(self, func, *args, **kwargs):
            call = functools.partial(func, *args, **kwargs)
            return await self._loop.run_in_executor(None, call)

    @asvetlov
    Copy link
    Contributor Author

    asvetlov commented Nov 1, 2019

    Paul's version looks better.

    Two notes:

    1. get_running_loop() should be used instead of get_event_loop()
    2. There is no await executer.shutdown() API, the method is synchronous.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    I don't think changing the default executor is a good approach. What happens, if two or more thread pools are running at the same time? In that case they will use the same default executor anyway, so creating a new executor each time seems like a waste.

    I agree that it would be better to have ThreadPool use an internal executor rather than relying on the event loop's default executor. The main reason I hadn't was because we hadn't implemented an asynchronous executor shutdown outside of loop.shutdown_default_executor(), but we could potentially move the functionality to a private function (in Lib/asyncio/base_events.py) so it's reusable for ThreadPool. It could be something like this:

        async def _shutdown_executor(executor, loop):
            future = loop.create_future()
            thread = threading.Thread(target=loop._do_shutdown, args=(executor,future))
            thread.start()
            try:
                await future
            finally:
                thread.join()
    
        def _do_shutdown(loop, executor, future):
            try:
                executor.shutdown(wait=True)
                loop.call_soon_threadsafe(future.set_result, None)
            except Exception as ex:
                loop.call_soon_threadsafe(future.set_exception, ex)

    Note that the above would be for internal use only, for the existing loop.shutdown_default_executor() and the new asyncio.ThreadPool. For it to support both, it would have to accept an explicit loop argument. It also does not need a robust API, since it's private.

    Shutting down the default executor seems unnecessary and could impact lower level code which is using it. The default executor is shutdown at the end of asyncio.run anyway.

    I agree with your point regarding the shutdown of the default executor one. But I think we should shutdown the internal executor for the ThreadPool, as a main point context managers is to start and clean up their own resources. Also, I'm aware that asyncio.run() shuts down the default executor, I implemented that fairly recently in #59940. ;)

    Another substantial concern is in the case of a coroutine that contains asyncio.ThreadPool being executed without asyncio.run(). There are still use cases for using the lower level loop.run_until_complete() for more complex asyncio programs. I don't think we should make asyncio.ThreadPool dependent on the coroutine being executed with asyncio.run(). Thus, it makes sense that ThreadPool should create a new instance of ThreadPoolExecutor upon entry of the context manager and then shut it down upon exit.

    I'm not sure if a new ThreadPoolExecutor really needs to be created for each ThreadPool though.

    IMO, a context manager should create and then finalize it's own resources, rather than sharing the same executor across contexts. Sharing the same one seems to defeat the purpose of using a context manager in the first place, no?

    Run method should be:

        async def run(self, func, *args, **kwargs):
            call = functools.partial(func, *args, **kwargs)
            return await self._loop.run_in_executor(None, call)

    Correction: if we're using an internal executor now, this should instead be self._loop.run_in_executor(self._executor, call). With None, it will simply use the event loop's default executor rather the context manager's ThreadPoolExecutor.

    run should be awaitable method, see bpo-38430

    Agreed, good point.

    Paul's version looks better.

    I think he had some good points, particularly around using an internal executor instead the event loop's default executor; but there's some parts that I disagree with, see above reasons.

    1. get_running_loop() should be used instead of get_event_loop()

    Note: If get_running_loop() is used instead, it has to set self._loop within a coroutine (since get_running_loop() can only be used within coroutines), that's why in my version it was within __aenter__. I think this would make the most sense.

    1. There is no await executer.shutdown() API, the method is synchronous.

    That's why in my version I was using the existing event loop API, since we had already implemented an asynchronous loop.shutdown_default_executor() method that calls executor.shutdown(). However, if we added a private _shutdown_executor() and _do_shutdown() as I mentioned above, that wouldn't be an issue.

    Thanks for the feedback on the prototype Paul and Andrew, both of you brought up some good points. I'll start working on a PR.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    Also, I agree with Paul's idea of initializing the ThreadPoolExecutor in the __init__ instead of __aenter__, that makes more sense now that I think about it.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    thread = threading.Thread(target=loop._do_shutdown, args=(executor,future))

    Correction:

    thread = threading.Thread(target=_do_shutdown, args=(loop, executor,future))

    Also, it might make more sense to rename _do_shutdown() to _do_executor_shutdown() to give the function's name more context; renaming shouldn't be an issue since it's private. Plus, it was just added recently in 3.9, so there's even less backwards compatibility to be concerned with.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    Actually, I think it would be better to move the functionality of loop.shutdown_default_executor() to a new private method loop._shutdown_executor() that takes an executor argument rather than shutting down the default one. This could be used in both loop.shutdown_default_executor() and ThreadPool. There's no need to move it to function instead of being a method of BaseEventLoop though, that doesn't make sense now that I think about it more.

    Sorry if my thoughts are a bit disorganized, I think I need to get some sleep. (:

    @primal100
    Copy link
    Mannequin

    primal100 mannequin commented Nov 1, 2019

    Good points. I made a mistake in run

    Should be:

        async def run(self, func, *args, **kwargs):
            call = functools.partial(func, *args, **kwargs)
            return await self._loop.run_in_executor(self._executor, call)

    Also in this case run awaits and returns the result. Yury suggested earlier just to return the future and not await. I have no strong opinion either way. The above example does seem more higher level but Yury's example is more flexible.

    I agree that shutdown_default_executor and _do_shutdown should be changed to accept an executor argument so that any executor can be shutdown asynchronously. So the loop API would have a shutdown_executor method. shutdown_default_executor would just call shutdown_executor with the default executor as argument. That would be a good first step.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 1, 2019

    Also in this case run awaits and returns the result. Yury suggested earlier just to return the future and not await.

    Yeah that's roughly what my initial version was doing. I'm personally leaning a bit more towards returning the future rather than the result, but I'm okay with either option. What are your thoughts on this Yury and Andrew?

    I agree that shutdown_default_executor and _do_shutdown should be changed to accept an executor argument so that any executor can be shutdown asynchronously

    We could potentially add an internal method _shutdown_executor, but this would also require modification of _do_shutdown (so that it shuts down the executor passed, rather than the default one). I mentioned this in an earlier example, but this one shows all three together and changes _shutdown_executor to a method of BaseEventLoop:

    async def shutdown_default_executor(self):
        """Schedule the shutdown of the default executor."""
        self.\_executor_shutdown_called = True
        executor = self.\_default_executor
        await self.\_shutdown_executor(executor)
    
        async def _shutdown_executor(self, executor):
            if executor is None:
                return
            future = self.create_future()
            thread = threading.Thread(target=self._do_shutdown, args=(executor,future))
            thread.start()
            try:
                await future
            finally:
                thread.join()
    
        def _do_shutdown(self, executor, future):
            try:
                executor.shutdown(wait=True)
                self.call_soon_threadsafe(future.set_result, None)
            except Exception as ex:
                self.call_soon_threadsafe(future.set_exception, ex)

    Functionally, it works the same for shutdown_default_executor(), but allows _shutdown_executor to be used for asyncio.ThreadPool as well. Since #60564 (adding timeout param) also makes changes to shutdown_default_executor(), it will be blocking this issue.

    @1st1
    Copy link
    Member

    1st1 commented Nov 1, 2019

    Few thoughts:

    1. I like the idea of having a context manager to create a thread pool. It should be initialized in a top-level coroutine and the passed down to other code, as in:
      async def main():
        async with asyncio.ThreadPool(concurrency=10) as pool:
          await something(pool)
          await something_else(pool)
      await pool.run(callable, *args)
    
      asyncio.run(main())
    1. It should be the "async with".

    2. We should not reuse the default executor. The default executor is used for things like DNS resolution. We don't want network activity to pause just because some task offloaded some blocking computation to its pool.

    3. I think it's OK to initialize the thread pool in ThreadPool.__init__. ThreadPool.__aenter__ would simply return self then.

    4. await ThreadPool.aclose() would close the loop gracefully (awaiting for all submitted and still running callables) in cases when people use the threadpool without 'async with'.

    5. I think we should aim for shipping a replacement for loop.run_in_executor in 3.9.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    async with asyncio.ThreadPool(concurrency=10) as pool:

    I'm definitely on board with the usage of an async context manager and the functionality shown in the example, but I'm not sure that I entirely understand what the "concurrency" kwarg in "concurrency=10" is supposed to represent in this case. Could you elaborate on what that would do functionally?

    It should be the "async with".

    We should not reuse the default executor.

    I think it's OK to initialize the thread pool in ThreadPool.__init__. ThreadPool.__aenter__ would simply return self then.

    I think we should aim for shipping a replacement for loop.run_in_executor in 3.9.

    Strong +1 on all of those points, I agree.

    await ThreadPool.aclose() would close the loop gracefully (awaiting for all submitted and still running callables) in cases when people use the threadpool without 'async with'.

    I believe behavior occurs within shutdown_default_executor(), correct? Specifically, within for ThreadPoolExecutor when executor.shutdown(wait=True) is called and all of the threads are joined without a timeout, it simply waits for each thread to terminate gracefully.

    So if we moved that functionality to a private coroutine method _shutdown_executor() as suggested in my above example, this could also be done for ThreadPool. Unless you want ThreadPool to be it's own entirely separate implementation that doesn't depend at all on ThreadPoolExecutor.

    Personally I think we can use ThreadPoolExecutor in the internal details; it seems that the main issue with it isn't the functionality, but rather the low level API offered with loop.run_in_executor().

    Also another point to consider is if we should have users explicitly call pool.aclose() or if this should be done automatically when exiting the context manager through the __aexit__. I prefer the latter myself for similar reasons to what I previously mentioned, with a context manager initializing it's own resources on entry and finalizing them upon exit.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    I believe behavior occurs within shutdown_default_executor(), correct? Specifically, within for ThreadPoolExecutor when executor.shutdown(wait=True) is called and all of the threads are joined without a timeout, it simply waits for each thread to terminate gracefully.

    Correction, I phrased this poorly:

    I believe this behavior occurs within shutdown_default_executor(), correct? Specifically, when executor.shutdown(wait=True) is called within _do_shutdown() and ...

    @1st1
    Copy link
    Member

    1st1 commented Nov 2, 2019

    > async with asyncio.ThreadPool(concurrency=10) as pool:

    I'm definitely on board with the usage of an async context manager and the functionality shown in the example, but I'm not sure that I entirely understand what the "concurrency" kwarg in "concurrency=10" is supposed to represent in this case. Could you elaborate on what that would do functionally?

    Number of OS threads to spawn.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    Number of OS threads to spawn.

    Ah I see, so this would correspond with the "max_workers" argument of ThreadPoolExecutor then, correct? If so, we could pass this in the __init__ for ThreadPool:

    def __init__(self, concurrency):
        ...
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)

    IMO, I think it would be a bit more clear to just explicitly call it "threads" or "max_threads", as that explains what it's effectively doing. While "concurrency" is still a perfectly correct way of describing the behavior, I think it might be a little too abstract for an argument name.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    def __init__(self, concurrency=None):

    Minor clarification: the default should probably be None, which would effectively set the default maximum number of threads to min(32, os.cpu_count() + 4), once it's passed to ThreadPoolExecutor.

    @1st1
    Copy link
    Member

    1st1 commented Nov 2, 2019

    IMO, I think it would be a bit more clear to just explicitly call it
    "threads" or "max_threads", as that explains what it's effectively doing.
    While "concurrency" is still a perfectly correct way of describing the
    behavior, I think it might be a little too abstract for an argument name.

    And that's why I like it. If we add ProcessPool it will have the same argument: concurrency.

    max_workers isn't correct, as we want to spawn all threads and all processes when we start. Thus btw makes me think that initializing threads/processes in __init__ is a bad idea, as spawning can be asynchronous.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    And that's why I like it. If we add ProcessPool it will have the same argument: concurrency.

    max_workers isn't correct, as we want to spawn all threads and all processes when we start. Thus btw makes me think that initializing threads/processes in __init__ is a bad idea, as spawning can be asynchronous.

    Ah, I see, that would make sense then if we're considering adding a ProcessPool at some point and want to make the argument name the same. Based on your ideas so far, it seems that it will likely not be compatible with the existing ThreadPoolExecutor.

    From my understanding, the executor classes are designed around spawning the threads (or processes in the case of ProcessPoolExecutor) as needed up to max_workers, rather than spawning them upon startup. The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell.

    I can start working on a draft/prototype for a design. It will likely take more time to implement this, but it will give us the chance to have a native asyncio ThreadPool that doesn't directly rely upon the API in concurrent.futures.

    Also, would you prefer for there to be an abstract asyncio.AbstractPool class that ThreadPool inherits from? I think this would make it more streamlined to implement a similar ProcessPool at some point in the future. This would be similar to the relationship between the Executor class and ThreadPoolExecutor, or AbstractEventLoop and BaseEventLoop.

    Let me know if you approve of this idea Yury and Andrew. It's quite a bit more involved than implementing a simple high level version of loop.run_in_executor(), but I think it would prove to be worthwhile in the long term.

    @1st1
    Copy link
    Member

    1st1 commented Nov 2, 2019

    From my understanding, the executor classes are designed around spawning the threads (or processes in the case of ProcessPoolExecutor) as needed up to max_workers, rather than spawning them upon startup. The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell.

    I can start working on a draft/prototype for a design. It will likely take more time to implement this, but it will give us the chance to have a native asyncio ThreadPool that doesn't directly rely upon the API in concurrent.futures.

    No, that would be too much work. Writing a thread pool or process pool from scratch is an extremely tedious task and it will take us years to stabilize the code. It's not simple.

    We should design *our* API correctly though. And that means that we can't initialize our pools in __init__.

    Something along these lines would work:

        class ThreadPool:
          async def start(): ...
          async def __aenter__(self): 
            await self.start()
            return self
          
          async def aclose(): ...
          async def __aexit__(self, *exc):
            await self.aclose()

    Let me know if you approve of this idea Yury and Andrew. It's quite a bit more involved than implementing a simple high level version of loop.run_in_executor(), but I think it would prove to be worthwhile in the long term.

    It shouldn't be much harder than run_in_executor() as we continue to rely on concurrent.future (at least for the first version).

    We need to start the discussion about this API on discourse. Please give me a few days to organize that.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 2, 2019

    No, that would be too much work. Writing a thread pool or process pool from scratch is an extremely tedious task and it will take us years to stabilize the code. It's not simple.

    We should design *our* API correctly though. And that means that we can't initialize our pools in __init__.

    I can see that it would be a lot of additional work, that's why I was using ThreadPoolExecutor in my earlier prototype.

    The main issue that I'm not seeing though is how exactly we're going to actually spawn the threads or processes asynchronously upon *startup* in ThreadPool using ThreadPoolExecutor's existing public API, with only the submit(), map(), and shutdown() methods.

    Unless I'm misunderstanding something, the executor classes were not designed with that intention in mind. With Executor, a new thread/process (worker) is spawned *as needed* when submit() is called throughout the lifespan of the Executor up to max_workers, rather than upon startup as you're wanting ThreadPool to do.

    Thus, it seemed to make more sense to me to actually build up a new Pool class from scratch that was largely based on Executor, but with significantly differing functionality. Otherwise, it seems like we would have to make some modifications to ThreadPoolExecutor, or inherit from it and then redesign the internals of some of the methods to change the way the threads/processes are spawned.

    It shouldn't be much harder than run_in_executor() as we continue to rely on concurrent.future (at least for the first version).

    I think that run_in_executor() was far more simple compared to this. The functionality of run_in_executor() almost maps directly to executor.submit(), other than a few conditional checks and converting the concurrent.futures.Future returned to an asyncio.Future through wrap_future().

    We need to start the discussion about this API on discourse. Please give me a few days to organize that.

    I agree that we should probably continue this discussion on discourse, as it probably goes beyond the scope of a single issue. No problem.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 4, 2019

    The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell.

    Thus, it seemed to make more sense to me to actually build up a new Pool class from scratch that was largely based on Executor, but with significantly differing functionality. Otherwise, it seems like we would have to make some modifications to ThreadPoolExecutor, or inherit from it and then redesign the internals of some of the methods to change the way the threads/processes are spawned.

    I'm going to have to rescind the above statements. I was able to implement a new prototype of asyncio.ThreadPool (using ThreadPoolExecutor) that spawns it's threads asynchronously on startup. Since this one a bit more involved than the previous code examples, I created a gist: https://gist.github.com/aeros/8a86de6b13f17b9f717ea539ee1ee78f

    It's by no means a complete implementation, but it at least proves the functionality that Yury described is very much possible using the existing ThreadPoolExecutor class.

    @1st1
    Copy link
    Member

    1st1 commented Nov 4, 2019

    I'm going to have to rescind the above statements. I was able to implement a new prototype of asyncio.ThreadPool (using ThreadPoolExecutor) that spawns it's threads asynchronously on startup. Since this one a bit more involved than the previous code examples, I created a gist: https://gist.github.com/aeros/8a86de6b13f17b9f717ea539ee1ee78f

    Nice work! This is a great excercise, but we can really just use concurrent.futures.ThreadPool as is. Spawning threads is fast. As I mentioned before all we need to do is to design *our* API to NOT initialize pools in __init__, that's it. The design outlined in https://bugs.python.org/msg355881 would do that.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 4, 2019

    Nice work! This is a great excercise, but we can really just use concurrent.futures.ThreadPool as is. Spawning threads is fast. As I mentioned before all we need to do is to design *our* API to NOT initialize pools in __init__, that's it. The design outlined in https://bugs.python.org/msg355881 would do that.

    Thanks, it was quite helpful for better understanding the internals of ThreadPoolExecutor.

    I think that I'm still not understanding something important though. Even if we initialize our ThreadPoolExecutor outside of __init__ (in a start() coroutine method, as your proposing), it seems like the threads will be spawned throughout the lifespan of the threadpool, rather than upon startup since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called (up to max_workers) rather than upon initialization. So even if an instance of ThreadPoolExecututor is initialized asynchronously within a start() coroutine method, the individual threads within it won't be spawned at the same time.

    That's why I wrote an explicit way of spawning threads in the above example, based on the way that ThreadPoolExecutor spawns threads in _adjust_thread_count(), which is called at the end of submit().

    @aeros
    Copy link
    Contributor

    aeros commented Nov 4, 2019

    since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called

    It's also worth mentioning that ThreadPoolExecutor only spawns up to one additional thread at a time for each executor.submit() called.

    @1st1
    Copy link
    Member

    1st1 commented Nov 8, 2019

    I think that I'm still not understanding something important though. Even if we initialize our ThreadPoolExecutor outside of __init__ (in a start() coroutine method, as your proposing), it seems like the threads will be spawned throughout the lifespan of the threadpool, rather than upon startup since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called (up to max_workers) rather than upon initialization.

    Correct.

    So even if an instance of ThreadPoolExecututor is initialized asynchronously within a start() coroutine method, the individual threads within it won't be spawned at the same time.

    Correct. There are a few points of this approach:

    (a) design the API correctly;
    (b) ship something that definitely works with a proven ThreadPoolExecutor;
    (c) write lots of tests;
    (d) write docs;
    (e) if (a-d) are OK, refine the implementation later by replacing ThreadPoolExecutor with a proper (eager threads creation) implementation.

    That's why I wrote an explicit way of spawning threads in the above example, based on the way that ThreadPoolExecutor spawns threads in _adjust_thread_count(), which is called at the end of submit().

    Yeah. In your current approach you're using ThreadPoolExecutor private API, which makes the code a bit fragile. There are two alternatives:

    1. Extent ThreadPoolExecutor API to add an option of eager threads spawn.

    2. Not using ThreadPoolExecutor at all. We can write our own threads orchestration code, it's not that complicated. If we do that, our implementation will become quite faster than the current run_in_executor. We can use curio as inspiration. Last time I profiled asyncio I saw that the code binding concurrent.Future to asyncio.Future is quite complex, brittle, and slow.

    I'm in favor of (2), but let's go through (a-d) steps to get there.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 9, 2019

    (a) design the API correctly;
    (b) ship something that definitely works with a proven ThreadPoolExecutor;
    (c) write lots of tests;
    (d) write docs;
    (e) if (a-d) are OK, refine the implementation later by replacing ThreadPoolExecutor with a proper (eager threads creation) implementation.

    That sounds like a good strategy. I'll start working on step a, to build a more robust working implementation (that only uses ThreadPoolExecutor's public API), and then work on c and d once the API is approved.

    1. Not using ThreadPoolExecutor at all. We can write our own threads orchestration code, it's not that complicated. If we do that, our implementation will become quite faster than the current run_in_executor. We can use curio as inspiration. Last time I profiled asyncio I saw that the code binding concurrent.Future to asyncio.Future is quite complex, brittle, and slow.

    I'm in favor of (2), but let's go through (a-d) steps to get there.

    Agreed, I'm also in favor of option (2), but doing (a-d) first. I think this approach will provide far more stability (rather than implementing (2) immediately), as we'll be able to write extensive test coverage using a ThreadPoolExecutor implementation as a base, and then ensure the native asyncio threadpool implementation has the same intended behavior. Afterwards, could even keep the ThreadPoolExecutor version as private for testing purposes.

    The native asyncio version will likely require some additional tests to ensure that the threads are being spawned eagerly, but they should have very similar overall behavior, with the asyncio version having better performance.

    I'm thinking that we could create a new Lib/asyncio/pools.py, for the initial ThreadPoolExecutor implementation, to give us a separate area to work with for native asyncio one in the future. A similar asyncio.ProcessPool API could also be eventually created there as well. It might be feasible to fit the initial implementation in Lib/asyncio/base_events.py, but IMO the native asyncio version would not fit.

    (From the user end it would make no difference, as long as we add pools.__all__ to Lib/asyncio/init.py)

    My only remaining question that I can think of: should we implement an asyncio.ProcessPool API (using ProcessPoolExecutor's public API) before working on the native asyncio version of asyncio.ThreadPool?

    This would likely allow us to release the executor versions well before the 3.9 beta (2020-05-18), and then the native asyncio versions (with eager spawning) either before 3.9rc1 (2020-08-10) or at some point during 3.10 alpha. I suspect that writing the extensive test coverage will take the most time.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 16, 2019

    (a) design the API correctly;
    (b) ship something that definitely works with a proven ThreadPoolExecutor;

    Yury and Andrew, here's my latest API design for asyncio.ThreadPool: master...aeros:asyncio-threadpool. This is for the initial ThreadPoolExecutor version, using the design based on Yury's suggestions. I plan on extending upon the docstrings, writing tests, and the documentation for it.

    My idea was to use the new Lib/asyncio/pools.py and AbstractPool to eventually implement an asyncio.ProcessPool (and the native version of asyncio.ThreadPool).

    I plan on opening a PR after I finish writing some tests and documentation, I'd like to include it all in the same PR if possible. But let me know what you think about the current API design, it would be much easier for me to make modifications at this stage.

    @aeros
    Copy link
    Contributor

    aeros commented Nov 17, 2019

    So, I just had an interesting idea... what if ThreadPool.run() returned a Task instead of a coroutine object?

    With the current version of asyncio.ThreadPool, if a user wants to create a Task, they would have to do something like this:

    async with asyncio.ThreadPool() as pool:
        task = asyncio.create_task(pool.run(io_blocking_func, 10, kwarg1='test'))
        other_task = asyncio.create_task(pool.run(io_blocking_func, 12))
        if some_conditional:
            task.cancel()
        results = await asyncio.gather(task, other_task, return_exceptions=True)
        ...

    To me, this looks like excessive boilerplate, particularly for a higher level API. Even for rather straightforward behavior, it requires nested function calls. If we were to return a task directly, this would be significantly cleaner:

    async with asyncio.ThreadPool() as pool:
        task = pool.run(io_blocking_func, 10, kwarg1='test')
        other_task = pool.run(io_blocking_func, 12)
        if some_conditional:
            task.cancel()
        results = await asyncio.gather(task, other_task, return_exceptions=True)
        ...

    Since asyncio.ThreadPool is intended to be a high-level API, I don't think it's an issue to return a Task from it's run() method. It would make it significantly easier and more convenient to work with from a user perspective.

    Thoughts?

    @aeros
    Copy link
    Contributor

    aeros commented Jan 26, 2020

    So, I just had an interesting idea... what if ThreadPool.run() returned a Task instead of a coroutine object?

    After having some time to think this over, I prefer the current behavior. I don't think there would be significant enough improvement from returning a Task instead, and it would likely result in an overall performance loss.

    Also, as a general update on the project, I'm close to being ready to open a PR to implement asyncio.ThreadPool. I finished the basic implementation and added a decent number of new tests to ensure its functionality. Here's my current progress: master...aeros:asyncio-threadpool

    I just need to work on adding the new documentation, and more specifically finding a good place for it in the current asyncio docs. Do you have any ideas for that, Yury? I figured that you might already have a preference in mind.

    @miss-islington
    Copy link
    Contributor

    New changeset cc2bbc2 by Kyle Stanley in branch 'master':
    bpo-32309: Implement asyncio.to_thread() (GH-20143)
    cc2bbc2

    @miss-islington
    Copy link
    Contributor

    New changeset e299130 by Miss Islington (bot) in branch '3.9':
    bpo-32309: Implement asyncio.to_thread() (GH-20143)
    e299130

    @vstinner
    Copy link
    Member

    Note for myself: Python 3.9 release manager (Lukasz) approved adding the feature to Python 3.9.0 beta2:
    #20212 (review)

    @vstinner vstinner added 3.9 only security fixes and removed 3.7 (EOL) end of life labels May 20, 2020
    @miss-islington
    Copy link
    Contributor

    New changeset 0f56263 by Kyle Stanley in branch 'master':
    bpo-32309: Add support for contextvars in asyncio.to_thread() (GH-20278)
    0f56263

    @miss-islington
    Copy link
    Contributor

    New changeset 3e65054 by Miss Islington (bot) in branch '3.9':
    bpo-32309: Add support for contextvars in asyncio.to_thread() (GH-20278)
    3e65054

    @aeros
    Copy link
    Contributor

    aeros commented May 21, 2020

    Now that the versionadded label has been added and the contextvar issue was addressed, this issue can be closed.

    @aeros aeros closed this as completed May 21, 2020
    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    3.9 only security fixes docs Documentation in the Doc dir topic-asyncio
    Projects
    None yet
    Development

    No branches or pull requests

    5 participants