Title: [Enhancement] Asyncio task decorator to provide interface to define async DAGs (similar to dask's delayed interface)
Type: enhancement Stage:
Components: asyncio Versions: Python 3.11
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: aa1371, asvetlov, yselivanov
Priority: normal Keywords:

Created on 2021-07-12 03:15 by aa1371, last changed 2021-07-15 08:25 by aa1371.

Messages (7)
msg397274 - (view) Author: Art (aa1371) * Date: 2021-07-12 03:15
For those not familiar, the dask delayed interface allows a user to define a DAG through a functional invocation interface. Dask docs here:
Another example of this kind of interface is airflow's new TaskFlow api:

The proposed solution would look something like this. Essentially all we're doing is defining a decorator that will allow you to pass in coroutines to another coroutine, and will resolve the dependent coroutines before passing the results to your dependent coroutine.

    # Note0: can be removed, see Note2 below
    async def task_wrapper(val):
        return val

    def task(afunc):  # open to other names for the decorator since it might be a bit ambiguous
        async def inner(*args):  # Note1: real solution would be expanded to args/kwargs
            # Note2: `task_wrapper` kind of unneccesary, we can just conditionally not gather in those cases
            args = [arg if inspect.isawaitable(arg) else task_wrapper(arg) for arg in args]
            args = await asyncio.gather(*args)
            return await afunc(*args)
        return inner

The advantage this gives us in asyncio is that we can easily build processing pipelines where each piece is completely independent and does not know anything about any other piece of the pipeline. Obviously this is already possible currently, but this simple wrapper will provide a very clean way to connect it all together.

Take the following example, where we want to fetch data for various ids and post process/upload them.

    async def fetch(x):
        # Note3: timings here defined to demo obvious expected async behavior in completion order of print statements
        sleep_time = {'a1': 1, 'a2': 2, 'b1': 4, 'b2': 0.5, 'c1': 6, 'c2': 3.5}[x]
        await asyncio.sleep(sleep_time)
        ret_val = f'f({x})'
        print(f'Done {ret_val}')
        return ret_val

    async def process(x1, x2):
        await asyncio.sleep(1)
        ret_val = f'p({x1}, {x2})'
        print(f'Done {ret_val}')
        return ret_val

Notice we didn't decorate `process`, this is to allow us to demonstrate how you can still use the interface on functions that you can't or don't want to decorate. Now to define/execute our pipeline we can simply do this. :

    async def main():
        fa1 = fetch('a1')
        fa2 = fetch('a2')
        fb1 = fetch('b1')
        fb2 = fetch('b2')
        fc1 = fetch('c1')
        fc2 = fetch('c2')
        pa = task(process)(fa1, fa2)
        pb = task(process)(fb1, fb2)
        pc = task(process)(fc1, fc2)
        return await asyncio.gather(pa, pb, pc)
    loop = asyncio.new_event_loop()

This will be a very simple non-breaking inclusion to the library, that will allow users to build clean/straightforward asynchronous processing pipelines/DAGs.
msg397428 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2021-07-13 16:33
I'm not convinced why @task decorator should be a part of asyncio.

You can provide the implementation as a part of some third-party library on Looks like it doesn't require any change of asyncio itself.
msg397437 - (view) Author: Art (aa1371) * Date: 2021-07-13 18:46
I understand the hesitation to add this to the library, however I'd like to make a case for why I believe that it fits. To start this is my understanding of the scope of asyncio (or at least part of the scope, taken from the asyncio docs):

asyncio provides a set of high-level APIs to:
- run Python coroutines concurrently and have full control over their execution;
- perform network IO and IPC;
- control subprocesses;
- distribute tasks via queues;
- synchronize concurrent code;

I believe this decorator would be another high-level api that would be considered a valid, useful, simple, and if it was more well-known, popular, method of exerting "full control over their execution".

Here's an example of a few places from around the internet where people are asking how to do something that would directly benefit from this API, and where I believe frankly that this API would provide a much cleaner/better solution than those proposed.
    (inspiration for demo solution provided above)
    (user specifies their tasks are IO bound and that they tried to use asyncio, but to no avail, or it was too complicated)

In both these cases above, it is explicitly implied that the user tried to use asyncio to solve their problem, but could not find a satisfactory way to do so.

Here's a few more examples of asyncio code, that aren't as directly related to the issue at hand, but would greatly benefit from this isolated "task" structure.
    (user trying to build/execute a DAG of async tasks)
    (user could/should consider their task pipeline as a DAG which would conceptually simplify what they are trying to achieve)

To directly address the concern of "why add this to asyncio, and not a third-party library".
1) I believe this falls into the scope of the library as demonstrated in the first paragraph of this comment
2) Users look to and trust asyncio to provide good high level apis to build better code/systems (As you can see from some of the examples I posted above)
3) This is a very simple high-level api, that will not interfere with anything else in the library. Will introduce no compatibility issues
4) This point is conjecture, but I believe that this api will keep users from unnecessarily moving their workflows to other libraries like dask (which I love and contribute too) or airflow as you can see from some of the examples where users claim to try other libraries for something that asyncio could easily handle.
5) Inclusion in asyncio (over a third-party library) would provide faster, more reputable exposure to users of a great method of thinking about/solving their problems.

I hope you'll re-consider your position, but completely understand if not, in which case I'll close this issue.
msg397438 - (view) Author: Art (aa1371) * Date: 2021-07-13 18:48
Note: regarding my inclusion of "airflow" in point #4 above. I in no way believe that airflow is a viable alternative to this proposal. I included it because one of those posts I linked explicitly mentioned that they looked into it as an alternative.
msg397449 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2021-07-13 22:18
Yeah, I'm also not convinced this has to be part of asyncio, especially since we'll likely have task groups in 3.11.
msg397455 - (view) Author: Art (aa1371) * Date: 2021-07-14 02:12
Hi Yury, could you by any chance point me to some material on the inclusion of task groups in 3.11? I've found these "task group" docs from a library called AnyIO:

If the asyncio task group feature is similar to this, then while I think it is a nice inclusion, I believe it does not cover the use case that this proposal would cover.

* If your opinion on the matter is still not final, then would you be open to a further demonstration of this proposal? I could gather several use cases from SO and other sites and demo how they could be elegantly solved with this interface.
msg397530 - (view) Author: Art (aa1371) * Date: 2021-07-15 08:25
FYI - I've added created a new pypi project for this called `aiodag`. 

    pip install aiodag

Github here:
Pypi here:

I would appreciate it if you could find some time take a look just at the readme on the github page and provide any thoughts. I still think that this would be a good fit for asyncio directly. But if still not convinced is there any conditions under which you could see this project eventually being folded into the library?

Thanks for your time in reviewing this issue
Date User Action Args
2021-07-15 08:25:05aa1371setmessages: + msg397530
2021-07-14 02:12:11aa1371setmessages: + msg397455
2021-07-13 22:18:29yselivanovsetmessages: + msg397449
2021-07-13 18:48:59aa1371setmessages: + msg397438
2021-07-13 18:46:32aa1371setmessages: + msg397437
2021-07-13 16:33:21asvetlovsetmessages: + msg397428
2021-07-12 09:19:20aa1371settitle: [Enhancement] Asyncio task decorator to provide functionality similar to dask's delayed interface -> [Enhancement] Asyncio task decorator to provide interface to define async DAGs (similar to dask's delayed interface)
2021-07-12 03:15:54aa1371create