This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Implement asyncio.run_in_executor shortcut
Type: Stage: resolved
Components: asyncio, Documentation Versions: Python 3.9
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: Nosy List: aeros, asvetlov, chris.jerdonek, miss-islington, primal, vstinner, yselivanov
Priority: normal Keywords: patch

Created on 2017-12-13 19:54 by asvetlov, last changed 2022-04-11 14:58 by admin. This issue is now closed.

Pull Requests
URL Status Linked Edit
PR 4848 open asvetlov, 2017-12-13 19:55
PR 18410 closed aeros, 2020-02-08 03:36
PR 20143 closed aeros, 2020-05-17 01:57
PR 20212 merged miss-islington, 2020-05-19 09:43
PR 20278 merged aeros, 2020-05-21 03:48
PR 20279 merged miss-islington, 2020-05-21 05:20
Messages (43)
msg308238 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-13 19:54
loop.create_task() and loop.run_in_executor are present very often in user code. But they are require a loop instance, actual call looks like

loop = asyncio.get_running_loop()
loop.create_task(coro())

The proposal adds create_task(coro) and run_in_executor(executor, func, *args) shortcuts for this.
msg308239 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-13 20:03
I don't like the low-level API of run_in_executor.  "executor" being the first argument, the inability to pass **kwargs, etc.

I'd expect to see a more high-level API, perhaps the one that supports 'async with':

    async with asyncio.ThreadPool() as pool:
        f1 = pool.run(func1)
        f2 = pool.run(func2)
        r3 = await pool.run(func3)

    r1 = f1.result()
    r2 = f2.result()
    print(r1, r2, r3)

I mean it's great that we can use 'concurrent.futures' in asyncio, but having native asyncio pools implementation would be way more convenient to the users.

In any case, can we focus this issue on the "run_in_executor" API, and open a new one for "create_task"?
msg308243 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-13 20:28
Removed create_task() from title
msg308245 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-13 20:35
https://bugs.python.org/issue32311 opened for create_task()
msg341352 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-05-03 19:22
In Python 3.7 loop.run_in_executor() is the only user-faced method that requires a loop.

asyncio.ThreadPool() sounds great. Maybe thread group can provide better api.

But for Python 3.8 adding `run_in_executor` top-level function looks the only easy and obvious way to help people getting rid of explicit loop usage.

Yuri, what do you think?
msg355771 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-10-31 21:46
> I don't like the low-level API of run_in_executor.  "executor" being the first argument, the inability to pass **kwargs, etc.

> I mean it's great that we can use 'concurrent.futures' in asyncio, but having native asyncio pools implementation would be way more convenient to the users.

I agree that there's an issue with this, but I think for the high level API it can be an asynchronous context manager that makes use of concurrent.futures.ThreadPoolExecutor, since that one seems to fit the majority of asyncio use cases. If users want to utilize concurrent.futures.ProcessPoolExecutor or a customized one, they can use loop.run_in_executor() instead. 

Unless there's something I'm missing with regards to concurrent.futures.ThreadPoolExecutor that specifically makes it less usable. Can't asyncio.ThreadPool make use of concurrent.futures.ThreadPoolExecutor in the internal details while providing a better API?

> I'd expect to see a more high-level API, perhaps the one that supports 'async with':

> asyncio.ThreadPool() sounds great. Maybe thread group can provide better api.

I'd be willing to work on implementing the asyncio.ThreadPool API as Yury described it.

> But for Python 3.8 adding `run_in_executor` top-level function looks the only easy and obvious way to help people getting rid of explicit loop usage.

I can see your point Andrew, but I think that if we implement asyncio.run_in_executor() and then later add asyncio.ThreadPool(), it's going to be against "one obvious way" and we'll end up adding two high-level functions for the same purpose. This principle can't always be adhered to, but I think we should try to when it's possible. 

Personally, I think the API for asyncio.ThreadPool would be much more robust, so it seems worthwhile to try implementing this one first. If there are a number of unforeseen complications, asyncio.run_in_executor() might still be a decent fallback.
msg355773 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-10-31 21:49
> end up adding two high-level functions

Clarification: asyncio.run_in_executor() would be a function, but asyncio.ThreadPool would be a context manager class.
msg355796 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 08:24
So, here's a prototype implementation of asyncio.ThreadPool that would function exactly as Yury described, but I'm not convinced about the design. Usually, it seems preferred to separate the context manager from the rest of the class (as was done with asyncio.Lock), but I think this one might be simple enough to be a single class:

class ThreadPool:
    def __init__(self, timeout=None):
        self.timeout = timeout
        self._loop = None
    
    async def __aenter__(self):
        self._loop = asyncio.get_running_loop()
        # Ensure that ThreadPoolExecutor is being used
        self._loop.default_executor = concurrent.futures.ThreadPoolExecutor()
        return self

    async def __aexit__(self, *args):
        await self._loop.shutdown_default_executor(timeout=self.timeout)

    def run(self, func, *args, **kwargs):
        call = functools.partial(func, *args, **kwargs)
        return self._loop.run_in_executor(None, call)

It utilizes the existing lower level event loop API to provide an asynchronous context manager. I'd say the main advantage is that it's significantly more user friendly, as there's no loop or executor argument to be specified, and users can freely pass kwargs to run() thanks to functools.partial().

Additionally, I also included a timeout parameter for shutting down the ThreadPoolExecutor, which will depend upon GH-16360. This can be used as such:

async def main():
    async with asyncio.ThreadPool(timeout=600) as pool:
        fut1 = pool.run(func)
        fut2 = pool.run(func, arg1, arg2)
        fut2 = pool.run(func, arg1, kwarg1=True)
    print(await asyncio.gather(fut1, fut2, fut3))

asyncio.run(main())

I don't expect that this would be the final design, but I think it's a decent prototype to demonstrate the functionality. Thoughts?
msg355798 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-11-01 09:34
`run` should be awaitable method, see #38430
msg355799 - (view) Author: Paul Martin (primal) * Date: 2019-11-01 09:35
I don't think changing the default executor is a good approach. What happens, if two or more thread pools are running at the same time? In that case they will use the same default executor anyway, so creating a new executor each time seems like a waste. 

Shutting down the default executor seems unnecessary and could impact lower level code which is using it. The default executor is shutdown at the end of asyncio.run anyway.

I also think it would be good to have a synchronous entry point, and not require a context manager. Having a ThreadPool per class instance would be a common pattern.


class ThreadPool:
    def __init__(self, timeout=None):
        self.timeout = timeout
        self._loop = asyncio.get_event_loop()
        self._executor = concurrent.futures.ThreadPoolExecutor()

    async def close(self): 
        await self._executor.shutdown(timeout=self.timeout)  
        
    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()

    def run(self, func, *args, **kwargs):
        call = functools.partial(func, *args, **kwargs)
        return self._loop.run_in_executor(self._executor, call)


I'm not sure if a new ThreadPoolExecutor really needs to be created for each ThreadPool though.
msg355800 - (view) Author: Paul Martin (primal) * Date: 2019-11-01 09:36
Run method should be:

    async def run(self, func, *args, **kwargs):
        call = functools.partial(func, *args, **kwargs)
        return await self._loop.run_in_executor(None, call)
msg355802 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-11-01 09:52
Paul's version looks better.

Two notes: 
1. get_running_loop() should be used instead of get_event_loop()
2. There is no `await executer.shutdown()` API, the method is synchronous.
msg355804 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 11:09
> I don't think changing the default executor is a good approach. What happens, if two or more thread pools are running at the same time? In that case they will use the same default executor anyway, so creating a new executor each time seems like a waste. 

I agree that it would be better to have ThreadPool use an internal executor rather than relying on the event loop's default executor. The main reason I hadn't was because we hadn't implemented an asynchronous executor shutdown outside of loop.shutdown_default_executor(), but we could potentially move the functionality to a private function (in Lib/asyncio/base_events.py) so it's reusable for ThreadPool. It could be something like this:

    async def _shutdown_executor(executor, loop):
        future = loop.create_future()
        thread = threading.Thread(target=loop._do_shutdown, args=(executor,future))
        thread.start()
        try:
            await future
        finally:
            thread.join()

    def _do_shutdown(loop, executor, future):
        try:
            executor.shutdown(wait=True)
            loop.call_soon_threadsafe(future.set_result, None)
        except Exception as ex:
            loop.call_soon_threadsafe(future.set_exception, ex)

Note that the above would be for internal use only, for the existing loop.shutdown_default_executor() and the new asyncio.ThreadPool. For it to support both, it would have to accept an explicit loop argument. It also does not need a robust API, since it's private. 

> Shutting down the default executor seems unnecessary and could impact lower level code which is using it. The default executor is shutdown at the end of asyncio.run anyway.

I agree with your point regarding the shutdown of the default executor one. But I think we should shutdown the internal executor for the ThreadPool, as a main point context managers is to start and clean up their own resources. Also, I'm aware that asyncio.run() shuts down the default executor, I implemented that fairly recently in GH-15735. ;)

Another substantial concern is in the case of a coroutine that contains asyncio.ThreadPool being executed without asyncio.run(). There are still use cases for using the lower level loop.run_until_complete() for more complex asyncio programs. I don't think we should make asyncio.ThreadPool dependent on the coroutine being executed with asyncio.run(). Thus, it makes sense that ThreadPool should create a new instance of ThreadPoolExecutor upon entry of the context manager and then shut it down upon exit. 

> I'm not sure if a new ThreadPoolExecutor really needs to be created for each ThreadPool though.

IMO, a context manager should create and then finalize it's own resources, rather than sharing the same executor across contexts. Sharing the same one seems to defeat the purpose of using a context manager in the first place, no?

> Run method should be:

    async def run(self, func, *args, **kwargs):
        call = functools.partial(func, *args, **kwargs)
        return await self._loop.run_in_executor(None, call)

Correction: if we're using an internal executor now, this should instead be self._loop.run_in_executor(self._executor, call). With `None`, it will simply use the event loop's default executor rather the context manager's ThreadPoolExecutor.

> `run` should be awaitable method, see #38430

Agreed, good point. 

> Paul's version looks better.

I think he had some good points, particularly around using an internal executor instead the event loop's default executor; but there's some parts that I disagree with, see above reasons.

> 1. get_running_loop() should be used instead of get_event_loop()

Note: If get_running_loop() is used instead, it has to set self._loop within a coroutine (since get_running_loop() can only be used within coroutines), that's why in my version it was within __aenter__. I think this would make the most sense. 

> 2. There is no `await executer.shutdown()` API, the method is synchronous.

That's why in my version I was using the existing event loop API, since we had already implemented an asynchronous loop.shutdown_default_executor() method that calls executor.shutdown(). However, if we added a private _shutdown_executor() and _do_shutdown() as I mentioned above, that wouldn't be an issue. 

Thanks for the feedback on the prototype Paul and Andrew, both of you brought up some good points. I'll start working on a PR.
msg355805 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 11:17
Also, I agree with Paul's idea of initializing the ThreadPoolExecutor in the __init__ instead of __aenter__, that makes more sense now that I think about it.
msg355807 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 11:23
> thread = threading.Thread(target=loop._do_shutdown, args=(executor,future))

Correction:

> thread = threading.Thread(target=_do_shutdown, args=(loop, executor,future))

Also, it might make more sense to rename _do_shutdown() to _do_executor_shutdown() to give the function's name more context; renaming shouldn't be an issue since it's private. Plus, it was just added recently in 3.9, so there's even less backwards compatibility to be concerned with.
msg355809 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 11:33
Actually, I think it would be better to move the functionality of loop.shutdown_default_executor() to a new private method loop._shutdown_executor() that takes an executor argument rather than shutting down the default one. This could be used in both loop.shutdown_default_executor() and ThreadPool. There's no need to move it to function instead of being a method of BaseEventLoop though, that doesn't make sense now that I think about it more. 

Sorry if my thoughts are a bit disorganized, I think I need to get some sleep. (:
msg355811 - (view) Author: Paul Martin (primal) * Date: 2019-11-01 11:45
Good points. I made a mistake in run

Should be:

    async def run(self, func, *args, **kwargs):
        call = functools.partial(func, *args, **kwargs)
        return await self._loop.run_in_executor(self._executor, call)

Also in this case run awaits and returns the result. Yury suggested earlier just to return the future and not await. I have no strong opinion either way. The above example does seem more higher level but Yury's example is more flexible.

I agree that shutdown_default_executor and _do_shutdown should be changed to accept an executor argument so that any executor can be shutdown asynchronously. So the loop API would have a shutdown_executor method. shutdown_default_executor would just call shutdown_executor with the default executor as argument. That would be a good first step.
msg355842 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-01 21:09
> Also in this case run awaits and returns the result. Yury suggested earlier just to return the future and not await.

Yeah that's roughly what my initial version was doing. I'm personally leaning a bit more towards returning the future rather than the result, but I'm okay with either option. What are your thoughts on this Yury and Andrew?

> I agree that shutdown_default_executor and _do_shutdown should be changed to accept an executor argument so that any executor can be shutdown asynchronously

We could potentially add an internal method _shutdown_executor, but this would also require modification of _do_shutdown (so that it shuts down the executor passed, rather than the default one). I mentioned this in an earlier example, but this one shows all three together and changes _shutdown_executor to a method of BaseEventLoop:
        
    async def shutdown_default_executor(self):
        """Schedule the shutdown of the default executor."""
        self._executor_shutdown_called = True
        executor = self._default_executor
        await self._shutdown_executor(executor)

    async def _shutdown_executor(self, executor):
        if executor is None:
            return
        future = self.create_future()
        thread = threading.Thread(target=self._do_shutdown, args=(executor,future))
        thread.start()
        try:
            await future
        finally:
            thread.join()

    def _do_shutdown(self, executor, future):
        try:
            executor.shutdown(wait=True)
            self.call_soon_threadsafe(future.set_result, None)
        except Exception as ex:
            self.call_soon_threadsafe(future.set_exception, ex)

Functionally, it works the same for shutdown_default_executor(), but allows _shutdown_executor to be used for asyncio.ThreadPool as well. Since GH-16360 (adding timeout param) also makes changes to shutdown_default_executor(), it will be blocking this issue.
msg355846 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-01 21:38
Few thoughts:

1. I like the idea of having a context manager to create a thread pool. It should be initialized in a top-level coroutine and the passed down to other code, as in:

  async def main():
    async with asyncio.ThreadPool(concurrency=10) as pool:
      await something(pool)
      await something_else(pool)

      await pool.run(callable, *args)

  asyncio.run(main())

2. It should be the "async with".

3. We should not reuse the default executor. The default executor is used for things like DNS resolution.  We don't want network activity to pause just because some task offloaded some blocking computation to its pool.

4. I think it's OK to initialize the thread pool in `ThreadPool.__init__`.  `ThreadPool.__aenter__` would simply return `self` then.

5. `await ThreadPool.aclose()` would close the loop gracefully (awaiting for all submitted and still running callables) in cases when people use the threadpool without 'async with'.

6. I think we should aim for shipping a replacement for `loop.run_in_executor` in 3.9.
msg355852 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 00:57
> async with asyncio.ThreadPool(concurrency=10) as pool:

I'm definitely on board with the usage of an async context manager and the functionality shown in the example, but I'm not sure that I entirely understand what the "concurrency" kwarg in "concurrency=10" is supposed to represent in this case. Could you elaborate on what that would do functionally?

> It should be the "async with".

> We should not reuse the default executor.

> I think it's OK to initialize the thread pool in `ThreadPool.__init__`.  `ThreadPool.__aenter__` would simply return `self` then.

> I think we should aim for shipping a replacement for `loop.run_in_executor` in 3.9.

Strong +1 on all of those points, I agree. 

> `await ThreadPool.aclose()` would close the loop gracefully (awaiting for all submitted and still running callables) in cases when people use the threadpool without 'async with'.

I believe behavior occurs within shutdown_default_executor(), correct? Specifically, within for ThreadPoolExecutor when executor.shutdown(wait=True) is called and all of the threads are joined without a timeout, it simply waits for each thread to terminate gracefully. 

So if we moved that functionality to a private coroutine method _shutdown_executor() as suggested in my above example, this could also be done for ThreadPool. Unless you want ThreadPool to be it's own entirely separate implementation that doesn't depend at all on ThreadPoolExecutor. 

Personally I think we can use ThreadPoolExecutor in the internal details; it seems that the main issue with it isn't the functionality, but rather the low level API offered with loop.run_in_executor().

Also another point to consider is if we should have users explicitly call pool.aclose() or if this should be done automatically when exiting the context manager through the __aexit__. I prefer the latter myself for similar reasons to what I previously mentioned, with a context manager initializing it's own resources on entry and finalizing them upon exit.
msg355854 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 01:16
> I believe behavior occurs within shutdown_default_executor(), correct? Specifically, within for ThreadPoolExecutor when executor.shutdown(wait=True) is called and all of the threads are joined without a timeout, it simply waits for each thread to terminate gracefully.
 
Correction, I phrased this poorly:

I believe this behavior occurs within shutdown_default_executor(), correct? Specifically, when executor.shutdown(wait=True) is called within _do_shutdown() and ...
msg355855 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-02 01:16
>> async with asyncio.ThreadPool(concurrency=10) as pool:

> I'm definitely on board with the usage of an async context manager and the functionality shown in the example, but I'm not sure that I entirely understand what the "concurrency" kwarg in "concurrency=10" is supposed to represent in this case. Could you elaborate on what that would do functionally?

Number of OS threads to spawn.
msg355857 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 02:48
> Number of OS threads to spawn.

Ah I see, so this would correspond with the "max_workers" argument of ThreadPoolExecutor then, correct? If so, we could pass this in the __init__ for ThreadPool:

def __init__(self, concurrency):
    ...
    self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)

IMO, I think it would be a bit more clear to just explicitly call it "threads" or "max_threads", as that explains what it's effectively doing. While "concurrency" is still a perfectly correct way of describing the behavior, I think it might be a little too abstract for an argument name.
msg355858 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 02:51
> def __init__(self, concurrency=None):

Minor clarification: the default should probably be None, which would effectively set the default maximum number of threads to min(32, os.cpu_count() + 4), once it's passed to ThreadPoolExecutor.
msg355862 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-02 05:19
> 
> 
> 
> IMO, I think it would be a bit more clear to just explicitly call it
> "threads" or "max_threads", as that explains what it's effectively doing.
> While "concurrency" is still a perfectly correct way of describing the
> behavior, I think it might be a little too abstract for an argument name.
> 
> 
> 

> 
> 
> 
> 
> 
> 
> 

And that's why I like it. If we add ProcessPool it will have the same argument: concurrency.

max_workers isn't correct, as we want to spawn all threads and all processes when we start. Thus btw makes me think that initializing threads/processes in __init__ is a bad idea, as spawning can be asynchronous.

> 
> 
> 
> 
> 
> 
> 
>
msg355863 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 06:30
> And that's why I like it. If we add ProcessPool it will have the same argument: concurrency.

> max_workers isn't correct, as we want to spawn all threads and all processes when we start. Thus btw makes me think that initializing threads/processes in __init__ is a bad idea, as spawning can be asynchronous.

Ah, I see, that would make sense then if we're considering adding a ProcessPool at some point and want to make the argument name the same. Based on your ideas so far, it seems that it will likely not be compatible with the existing ThreadPoolExecutor. 

From my understanding, the executor classes are designed around spawning the threads (or processes in the case of ProcessPoolExecutor) as needed up to max_workers, rather than spawning them upon startup. The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell. 

I can start working on a draft/prototype for a design. It will likely take more time to implement this, but it will give us the chance to have a native asyncio ThreadPool that doesn't directly rely upon the API in concurrent.futures. 

Also, would you prefer for there to be an abstract asyncio.AbstractPool class that ThreadPool inherits from? I think this would make it more streamlined to implement a similar ProcessPool at some point in the future. This would be similar to the relationship between the Executor class and ThreadPoolExecutor, or AbstractEventLoop and BaseEventLoop.

Let me know if you approve of this idea Yury and Andrew. It's quite a bit more involved than implementing a simple high level version of loop.run_in_executor(), but I think it would prove to be worthwhile in the long term.
msg355881 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-02 20:54
>  From my understanding, the executor classes are designed around spawning the threads (or processes in the case of ProcessPoolExecutor) as needed up to max_workers, rather than spawning them upon startup. The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell. 

> I can start working on a draft/prototype for a design. It will likely take more time to implement this, but it will give us the chance to have a native asyncio ThreadPool that doesn't directly rely upon the API in concurrent.futures. 

No, that would be too much work. Writing a thread pool or process pool from scratch is an extremely tedious task and it will take us years to stabilize the code. It's not simple.

We should design *our* API correctly though. And that means that we can't initialize our pools in __init__.

Something along these lines would work:

    class ThreadPool:
      async def start(): ...
      async def __aenter__(self): 
        await self.start()
        return self
      
      async def aclose(): ...
      async def __aexit__(self, *exc):
        await self.aclose()


> Let me know if you approve of this idea Yury and Andrew. It's quite a bit more involved than implementing a simple high level version of loop.run_in_executor(), but I think it would prove to be worthwhile in the long term.

It shouldn't be much harder than run_in_executor() as we continue to rely on concurrent.future (at least for the first version).

We need to start the discussion about this API on discourse.  Please give me a few days to organize that.
msg355883 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-02 23:28
> No, that would be too much work. Writing a thread pool or process pool from scratch is an extremely tedious task and it will take us years to stabilize the code. It's not simple.

> We should design *our* API correctly though. And that means that we can't initialize our pools in __init__.

I can see that it would be a lot of additional work, that's why I was using ThreadPoolExecutor in my earlier prototype. 

The main issue that I'm not seeing though is how exactly we're going to actually spawn the threads or processes asynchronously upon *startup* in ThreadPool using ThreadPoolExecutor's existing public API, with only the submit(), map(), and shutdown() methods. 

Unless I'm misunderstanding something, the executor classes were not designed with that intention in mind. With Executor, a new thread/process (worker) is spawned *as needed* when submit() is called throughout the lifespan of the Executor up to max_workers, rather than upon startup as you're wanting ThreadPool to do. 

Thus, it seemed to make more sense to me to actually build up a new Pool class from scratch that was largely based on Executor, but with significantly differing functionality. Otherwise, it seems like we would have to make some modifications to ThreadPoolExecutor, or inherit from it and then redesign the internals of some of the methods to change the way the threads/processes are spawned. 

> It shouldn't be much harder than run_in_executor() as we continue to rely on concurrent.future (at least for the first version).

I think that run_in_executor() was far more simple compared to this. The functionality of run_in_executor() almost maps directly to executor.submit(), other than a few conditional checks and converting the concurrent.futures.Future returned to an asyncio.Future through wrap_future(). 

> We need to start the discussion about this API on discourse.  Please give me a few days to organize that.

I agree that we should probably continue this discussion on discourse, as it probably goes beyond the scope of a single issue. No problem.
msg355938 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-04 11:15
> The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell.

> Thus, it seemed to make more sense to me to actually build up a new Pool class from scratch that was largely based on Executor, but with significantly differing functionality. Otherwise, it seems like we would have to make some modifications to ThreadPoolExecutor, or inherit from it and then redesign the internals of some of the methods to change the way the threads/processes are spawned. 

I'm going to have to rescind the above statements. I was able to implement a new prototype of asyncio.ThreadPool (using ThreadPoolExecutor) that spawns it's threads asynchronously on startup. Since this one a bit more involved than the previous code examples, I created a gist: https://gist.github.com/aeros/8a86de6b13f17b9f717ea539ee1ee78f

It's by no means a complete implementation, but it at least proves the functionality that Yury described is very much possible using the existing ThreadPoolExecutor class.
msg355957 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-04 16:22
> I'm going to have to rescind the above statements. I was able to implement a new prototype of asyncio.ThreadPool (using ThreadPoolExecutor) that spawns it's threads asynchronously on startup. Since this one a bit more involved than the previous code examples, I created a gist: https://gist.github.com/aeros/8a86de6b13f17b9f717ea539ee1ee78f

Nice work! This is a great excercise, but we can really just use concurrent.futures.ThreadPool as is. Spawning threads is fast. As I mentioned before all we need to do is to design *our* API to NOT initialize pools in __init__, that's it. The design outlined in https://bugs.python.org/msg355881 would do that.
msg355975 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-04 18:58
> Nice work! This is a great excercise, but we can really just use concurrent.futures.ThreadPool as is. Spawning threads is fast. As I mentioned before all we need to do is to design *our* API to NOT initialize pools in __init__, that's it. The design outlined in https://bugs.python.org/msg355881 would do that.

Thanks, it was quite helpful for better understanding the internals of ThreadPoolExecutor. 

I think that I'm still not understanding something important though. Even if we initialize our ThreadPoolExecutor outside of __init__ (in a start() coroutine method, as your proposing), it seems like the threads will be spawned throughout the lifespan of the threadpool, rather than upon startup since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called (up to max_workers) rather than upon initialization. So even if an instance of ThreadPoolExecututor is initialized asynchronously within a start() coroutine method, the individual threads within it won't be spawned at the same time.

That's why I wrote an explicit way of spawning threads in the above example, based on the way that ThreadPoolExecutor spawns threads in _adjust_thread_count(), which is called at the end of submit().
msg355978 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-04 19:15
> since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called

It's also worth mentioning that ThreadPoolExecutor only spawns up to one additional thread at a time for each executor.submit() called.
msg356242 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-11-08 14:29
> I think that I'm still not understanding something important though. Even if we initialize our ThreadPoolExecutor outside of __init__ (in a start() coroutine method, as your proposing), it seems like the threads will be spawned throughout the lifespan of the threadpool, rather than upon startup since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called (up to max_workers) rather than upon initialization.

Correct.

> So even if an instance of ThreadPoolExecututor is initialized asynchronously within a start() coroutine method, the individual threads within it won't be spawned at the same time.

Correct.  There are a few points of this approach:

(a) design the API correctly; 
(b) ship something that definitely works with a proven ThreadPoolExecutor; 
(c) write lots of tests;
(d) write docs;
(e) if (a-d) are OK, refine the implementation later by replacing ThreadPoolExecutor with a proper (eager threads creation) implementation.

> That's why I wrote an explicit way of spawning threads in the above example, based on the way that ThreadPoolExecutor spawns threads in _adjust_thread_count(), which is called at the end of submit().

Yeah. In your current approach you're using ThreadPoolExecutor private API, which makes the code a bit fragile.  There are two alternatives:

1. Extent ThreadPoolExecutor API to add an option of eager threads spawn.

2. Not using ThreadPoolExecutor at all. We can write our own threads orchestration code, it's not that complicated. If we do that, our implementation will become quite faster than the current run_in_executor.  We can use curio as inspiration.  Last time I profiled asyncio I saw that the code binding concurrent.Future to asyncio.Future is quite complex, brittle, and slow.

I'm in favor of (2), but let's go through (a-d) steps to get there.
msg356303 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-09 16:43
> (a) design the API correctly; 
(b) ship something that definitely works with a proven ThreadPoolExecutor; 
(c) write lots of tests;
(d) write docs;
(e) if (a-d) are OK, refine the implementation later by replacing ThreadPoolExecutor with a proper (eager threads creation) implementation.

That sounds like a good strategy. I'll start working on step a, to build a more robust working implementation (that only uses ThreadPoolExecutor's public API), and then work on c and d once the API is approved. 

> 2. Not using ThreadPoolExecutor at all. We can write our own threads orchestration code, it's not that complicated. If we do that, our implementation will become quite faster than the current run_in_executor.  We can use curio as inspiration.  Last time I profiled asyncio I saw that the code binding concurrent.Future to asyncio.Future is quite complex, brittle, and slow.

> I'm in favor of (2), but let's go through (a-d) steps to get there.

Agreed, I'm also in favor of option (2), but doing (a-d) first. I think this approach will provide far more stability (rather than implementing (2) immediately), as we'll be able to write extensive test coverage using a ThreadPoolExecutor implementation as a base, and then ensure the native asyncio threadpool implementation has the same intended behavior. Afterwards, could even keep the ThreadPoolExecutor version as private for testing purposes. 

The native asyncio version will likely require some additional tests to ensure that the threads are being spawned eagerly, but they should have very similar overall behavior, with the asyncio version having better performance. 

I'm thinking that we could create a new Lib/asyncio/pools.py, for the initial ThreadPoolExecutor implementation, to give us a separate area to work with for native asyncio one in the future. A similar asyncio.ProcessPool API could also be eventually created there as well. It might be feasible to fit the initial implementation in Lib/asyncio/base_events.py, but IMO the native asyncio version would not fit. 

(From the user end it would make no difference, as long as we add pools.__all__ to Lib/asyncio/__init__.py)

My only remaining question that I can think of: should we implement an asyncio.ProcessPool API (using ProcessPoolExecutor's public API) before working on the native asyncio version of asyncio.ThreadPool? 

This would likely allow us to release the executor versions well before the 3.9 beta (2020-05-18), and then the native asyncio versions (with eager spawning) either before 3.9rc1 (2020-08-10) or at some point during 3.10 alpha. I suspect that writing the extensive test coverage will take the most time.
msg356776 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-16 22:47
> (a) design the API correctly; 
> (b) ship something that definitely works with a proven ThreadPoolExecutor; 

Yury and Andrew, here's my latest API design for asyncio.ThreadPool: https://github.com/python/cpython/compare/master...aeros:asyncio-threadpool. This is for the initial ThreadPoolExecutor version, using the design based on Yury's suggestions. I plan on extending upon the docstrings, writing tests, and the documentation for it.

My idea was to use the new Lib/asyncio/pools.py and AbstractPool to eventually implement an asyncio.ProcessPool (and the native version of asyncio.ThreadPool).

I plan on opening a PR after I finish writing some tests and documentation, I'd like to include it all in the same PR if possible. But let me know what you think about the current API design, it would be much easier for me to make modifications at this stage.
msg356792 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2019-11-17 06:33
So, I just had an interesting idea... what if ThreadPool.run() returned a Task instead of a coroutine object?

With the current version of asyncio.ThreadPool, if a user wants to create a Task, they would have to do something like this:

async with asyncio.ThreadPool() as pool:
    task = asyncio.create_task(pool.run(io_blocking_func, 10, kwarg1='test'))
    other_task = asyncio.create_task(pool.run(io_blocking_func, 12))
    if some_conditional:
        task.cancel()
    results = await asyncio.gather(task, other_task, return_exceptions=True)
    ...

To me, this looks like excessive boilerplate, particularly for a higher level API. Even for rather straightforward behavior, it requires nested function calls. If we were to return a task directly, this would be significantly cleaner:

async with asyncio.ThreadPool() as pool:
    task = pool.run(io_blocking_func, 10, kwarg1='test')
    other_task = pool.run(io_blocking_func, 12)
    if some_conditional:
        task.cancel()
    results = await asyncio.gather(task, other_task, return_exceptions=True)
    ...

Since asyncio.ThreadPool is intended to be a high-level API, I don't think it's an issue to return a Task from it's run() method. It would make it significantly easier and more convenient to work with from a user perspective.

Thoughts?
msg360725 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-01-26 09:01
> So, I just had an interesting idea... what if ThreadPool.run() returned a Task instead of a coroutine object?

After having some time to think this over, I prefer the current behavior. I don't think there would be significant enough improvement from returning a Task instead, and it would likely result in an overall performance loss.

Also, as a general update on the project, I'm close to being ready to open a PR to implement asyncio.ThreadPool. I finished the basic implementation and added a decent number of new tests to ensure its functionality. Here's my current progress: https://github.com/python/cpython/compare/master...aeros:asyncio-threadpool

I just need to work on adding the new documentation, and more specifically finding a good place for it in the current asyncio docs. Do you have any ideas for that, Yury? I figured that you might already have a preference in mind.
msg369318 - (view) Author: miss-islington (miss-islington) Date: 2020-05-19 03:03
New changeset cc2bbc2227c3f5ed9d8f6b3bd052e6f9e68279d2 by Kyle Stanley in branch 'master':
bpo-32309: Implement asyncio.to_thread() (GH-20143)
https://github.com/python/cpython/commit/cc2bbc2227c3f5ed9d8f6b3bd052e6f9e68279d2
msg369331 - (view) Author: miss-islington (miss-islington) Date: 2020-05-19 10:03
New changeset e2991308c9b49547d9762157ac913dda94b5eb32 by Miss Islington (bot) in branch '3.9':
bpo-32309: Implement asyncio.to_thread() (GH-20143)
https://github.com/python/cpython/commit/e2991308c9b49547d9762157ac913dda94b5eb32
msg369408 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2020-05-20 00:18
Note for myself: Python 3.9 release manager (Lukasz) approved adding the feature to Python 3.9.0 beta2:
https://github.com/python/cpython/pull/20212#pullrequestreview-414278938
msg369493 - (view) Author: miss-islington (miss-islington) Date: 2020-05-21 05:20
New changeset 0f56263e62ba91d0baae40fb98947a3a98034a73 by Kyle Stanley in branch 'master':
bpo-32309: Add support for contextvars in asyncio.to_thread() (GH-20278)
https://github.com/python/cpython/commit/0f56263e62ba91d0baae40fb98947a3a98034a73
msg369495 - (view) Author: miss-islington (miss-islington) Date: 2020-05-21 05:38
New changeset 3e650545bfe949fa435b0d41e54986f615891ec8 by Miss Islington (bot) in branch '3.9':
bpo-32309: Add support for contextvars in asyncio.to_thread() (GH-20278)
https://github.com/python/cpython/commit/3e650545bfe949fa435b0d41e54986f615891ec8
msg369497 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-05-21 05:58
Now that the `versionadded` label has been added and the contextvar issue was addressed, this issue can be closed.
History
Date User Action Args
2022-04-11 14:58:55adminsetgithub: 76490
2020-05-21 05:58:41aerossetstatus: open -> closed
resolution: fixed
messages: + msg369497

stage: patch review -> resolved
2020-05-21 05:38:24miss-islingtonsetmessages: + msg369495
2020-05-21 05:20:59miss-islingtonsetpull_requests: + pull_request19556
2020-05-21 05:20:50miss-islingtonsetmessages: + msg369493
2020-05-21 03:48:46aerossetpull_requests: + pull_request19555
2020-05-20 00:18:28vstinnersetnosy: + vstinner

messages: + msg369408
versions: + Python 3.9, - Python 3.7
2020-05-19 10:03:29miss-islingtonsetmessages: + msg369331
2020-05-19 09:43:37miss-islingtonsetpull_requests: + pull_request19508
2020-05-19 03:03:35miss-islingtonsetnosy: + miss-islington
messages: + msg369318
2020-05-17 01:57:33aerossetpull_requests: + pull_request19448
2020-05-16 22:13:07chris.jerdoneksetnosy: + chris.jerdonek
2020-02-08 03:36:31aerossetpull_requests: + pull_request17784
2020-01-26 09:01:20aerossetmessages: + msg360725
2019-11-17 06:33:29aerossetmessages: + msg356792
2019-11-16 22:47:09aerossetmessages: + msg356776
2019-11-09 16:43:27aerossetmessages: + msg356303
2019-11-08 14:29:54yselivanovsetmessages: + msg356242
2019-11-04 19:15:20aerossetmessages: + msg355978
2019-11-04 18:58:35aerossetmessages: + msg355975
2019-11-04 16:22:47yselivanovsetmessages: + msg355957
2019-11-04 11:15:08aerossetmessages: + msg355938
2019-11-02 23:28:52aerossetmessages: + msg355883
2019-11-02 20:54:29yselivanovsetmessages: + msg355881
2019-11-02 06:30:56aerossetmessages: + msg355863
2019-11-02 05:19:56yselivanovsetmessages: + msg355862
2019-11-02 02:51:05aerossetmessages: + msg355858
2019-11-02 02:48:35aerossetmessages: + msg355857
2019-11-02 01:16:45yselivanovsetmessages: + msg355855
2019-11-02 01:16:41aerossetmessages: + msg355854
2019-11-02 00:57:57aerossetmessages: + msg355852
2019-11-01 21:38:43yselivanovsetmessages: + msg355846
2019-11-01 21:10:00aerossetmessages: + msg355842
2019-11-01 11:45:59primalsetmessages: + msg355811
2019-11-01 11:33:00aerossetmessages: + msg355809
2019-11-01 11:23:44aerossetmessages: + msg355807
2019-11-01 11:17:32aerossetmessages: + msg355805
2019-11-01 11:09:01aerossetmessages: + msg355804
2019-11-01 09:52:18asvetlovsetmessages: + msg355802
2019-11-01 09:36:46primalsetmessages: + msg355800
2019-11-01 09:35:53primalsetnosy: + primal
messages: + msg355799
2019-11-01 09:34:19asvetlovsetmessages: + msg355798
2019-11-01 08:24:19aerossetmessages: + msg355796
2019-10-31 21:49:19aerossetmessages: + msg355773
2019-10-31 21:46:48aerossetnosy: + aeros
messages: + msg355771
2019-05-03 19:22:03asvetlovsetmessages: + msg341352
2017-12-13 20:35:37asvetlovsetmessages: + msg308245
2017-12-13 20:28:43asvetlovsetmessages: + msg308243
title: Implement asyncio.create_task() and asyncio.run_in_executor shortcuts -> Implement asyncio.run_in_executor shortcut
2017-12-13 20:27:17asvetlovsetassignee: docs@python ->

nosy: - docs@python
2017-12-13 20:03:46yselivanovsetmessages: + msg308239
2017-12-13 19:55:39asvetlovsetkeywords: + patch
stage: patch review
pull_requests: + pull_request4736
2017-12-13 19:54:21asvetlovcreate