This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: PersistentTaskGroup API
Type: enhancement Stage: resolved
Components: asyncio Versions: Python 3.11
process
Status: closed Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: achimnol, asvetlov, gvanrossum, tinchester, yselivanov
Priority: normal Keywords:

Created on 2022-02-24 05:34 by achimnol, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Messages (23)
msg413880 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 05:34
I'm now tracking the recent addition and discussion of TaskGroup and cancellation scopes. It's interesting! :)

I would like to suggest to have a different mode of operation in asyncio.TaskGroup, which I named "PersistentTaskGroup".

AFAIK, TaskGroup targets to replace asyncio.gather, ensuring completion or cancellation of all tasks within the context manager scope.

I believe that a "safe" asyncio application should consist of a nested tree of task groups, which allow us to explicitly state when tasks of different purposes and contexts terminate.  For example, a task group for database transactions should be shutdown before a task group for HTTP handlers is shutdown.

To this end, in server applications with many sporadically spawned tasks throughout the whole process lifetime, there are different requirements for a task group that manages such task sets.  The tasks should *not* be cancelled upon the unhandled exceptions of sibling tasks in the task group, while we need an explicit "fallback" exception handler for those (just like "return_exceptions=True" in asyncio.gather).  The tasks belong to the task group but their references should not be kept forever to prevent memory leak (I'd suggest using weakref.WeakSet).  When terminating the task group itself, the ongoing tasks should be cancelled.  The cancellation process upon termination may happend in two phases: cancel request with initial timeout + additional limited waiting of cancellations.  (This is what Guido has mentioned in the discussion in bpo-46771.)

An initial sketch of PersistentTaskGroup is on aiotools:
https://github.com/achimnol/aiotools/blob/main/src/aiotools/ptaskgroup.py
Currently has no two-phase cancellation because it would require Python 3.11 with asyncio.Task.uncancel().

As Andrew has left a comment (https://github.com/achimnol/aiotools/issues/29#issuecomment-997437030), I think it is the time to revisit the concrete API design and whether to include PersistentTaskGroup in the stdlib or not.
msg413905 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 13:17
So I have more things in mind.

Basically PersistentTaskGroup resemble TaskGroup in that:
 - It has the same "create_task()" method.
 - It has an explicit "cancel()" or "shutdown()" method.
 - Exiting of the context manager means that all tasks of it have either completed or cancelled.

TaskGroup is intended to be used for a short-lived set of tasks, while PersistentTaskGroup is intended for a long-running set of tasks though individual tasks may be short-lived.  Thus, adding globally accessible monitoring facility for plain TaskGroup would not be that useful.  In contrast, it is super-useful to have a monitoring feature in PersistentTaskGroup!

In aiomonitor, we can enumerate the currently running asyncio tasks by reading asyncio.Task.all_tasks().  This has saved my life several times when debugging real-world server applications.  I think we can go further by having asyncio.PersistentTaskGroup.all_task_groups() which works in the same way.  If we make different modules and libraries to use different persistent task groups, then we could keep track of their task statistics separately.
msg413908 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 13:33
I think people may ask "why in stdlib?".

My reasons are:
 - We are adding new asyncio APIs in 3.11 such as TaskGroup, so I think it is a good time to add another one, as long as it does not break existing stuffs.
 - I believe that long-running task sets are equally representative use-case for real-world asyncio applications, particularly for servers.  Why not to have intrinsic support for them?
 - PersistentTaskGroup is going to be universally adopted throughout my 70+K LoC asyncio codebase, for instance, in every aiohttp.Application context, plugin contexts and modules, etc.

Of course, the name "PersistentTaskGroup" may look quite long, and I'm completely open with alternative suggestions.  I also welcome suggestions on changes to its functional semantics based on your experience and knowledge.
msg413913 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 14:18
Example use cases:

* Implement an event iteration loop to fetch events and dispatch the handlers depending on the event type (e.g., WebSocket connections, message queues, etc.)
  - https://github.com/aio-libs/aiohttp/pull/2885
  - https://github.com/lablup/backend.ai-manager/pull/533
  - https://github.com/lablup/backend.ai-agent/pull/341
  - https://github.com/lablup/backend.ai-agent/pull/331
* Separate monitoring of event handler tasks by the event sources.
  - aiomonitor extension to count currently ongoing tasks and extract the most frequent task stack frames
* Separate the fallback exception handlers by each persistent task group, instead of using the single "global" event loop exception handler.
msg413914 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 14:29
Some search results from cs.github.com with the input "asyncio task weakset", which may be replaced/simplified with PersistentTaskGroup:

- https://github.com/Textualize/textual/blob/38efc821737e3158a8c4c7ef8ecfa953dc7c0ba8/src/textual/message_pump.py#L43
- https://github.com/aiokitchen/aiomisc/blob/59abd4434e6d134537490db699f89a51df1e6bbc/aiomisc/entrypoint.py#L132
- https://github.com/anki/cozmo-python-sdk/blob/dd29edef18748fcd816550469195323842a7872e/src/cozmo/event.py#L102
- https://github.com/aio-libs/aiohttp-sse/blob/db7d49bfc8a4907d9a8e7696a85b9772e1c550eb/examples/graceful_shutdown.py#L50
- https://github.com/mosquito/aiormq/blob/9c6c0dfc771ea8f6e79b7532177640c2692c640f/aiormq/base.py#L18
https://github.com/mars-project/mars/blob/d1a14cc4a1cb96e40e1d81eef38113b0c9221a84/mars/lib/aio/_runners.py#L57
msg413922 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-24 15:50
@yselivanov @asvetlov
I think this API suggestion would require more refining and discussion in depths, and probably it may be better to undergo the PEP writing and review process.  Or I might need to have a separate discussion thread somewhere else (maybe discuss.python.org?).

Since I'm just a newbie in terms of Python core/stdlib development, could one of you guide me with what you think as the right way?
msg413926 - (view) Author: Guido van Rossum (gvanrossum) * (Python committer) Date: 2022-02-24 16:45
Could you just have a global task group that owns these long-running tasks? It could be embedded in a "toplevel" task that is created using asyncio.create_task() (which won't be deprecated). To shut down all long-running tasks at the end, just cancel that toplevel task.
msg413959 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 04:05
@gvanrossum As you mentioned, the event loop currently plays the role of the top-level task group already, even without introducing yet another top-level task.  For instance, asyncio.run() includes necessary shutdown procedures to cancel all belonging unfinished tasks and async generators.

However, I think we should provide an abstraction to organize the shutdown procedures in a *hierarchical* manner.  For example, we could cancel all event handler tasks before cancelling all HTTP handler tasks upon a web server shutdown.  This prevents any potential races between theses two different task sets.  I think you could agree with the necessity of orderly release of underlying resources during shutdown in general.  Currently asyncio.Task.all_tasks() is just a list created from WeakSet and we cannot guarantee which tasks will be cancelled first.

Yes, this can be done by manually writing codes to declare multiple WeakSets and a for-loop to cancel the contained tasks by enumerating over them, just like asyncio.run() does.  With the new addition of TaskGroup and ExceptionGroup, this code does not require core changes of Python.

But I believe that this hierarchical persistent task group abstraction should be an essential part of the API and asyncio tutorials when writing server applications.  asyncio.run() could be written by users, but I think the core devs have agreed with that it is an essential abstraction to be included in the stdlib.  I'd like argue that hierarchical persistent task groups is the same case.

Though I named it "PersistentTaskGroup" because it looks similar to TaskGroup, but this name may be misleading.  In PersistentTaskGroup, even when all tasks finish successfully, it does NOT terminate but keeps waiting for new tasks to be spawned.  It terminates only when the outer task is cancelled or its shutdown() method is called.  Note that belonging tasks may be either short-running or long-running, and this does not matter.  The point is to shutdown any remaining tasks in an orderly manner.  If you don't like the naming, please suggest alternatives.
msg413960 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 04:45
This particular experience, https://github.com/lablup/backend.ai-agent/pull/331, has actually motivated me to suggest PersistentTaskGroup.

The program subscribes the event stream of Docker daemon using aiohttp as an asyncio task, and this should be kept running throughout the whole application lifetime.  I first applied aiotools.TaskGroup to ensure shutdown of spawned event handler tasks, but I missed that it cancels all sibling tasks if one of the spawned tasks bubbles up an unhandled exception.  This has caused silent termination of the subscriber task and led to a bug.  We could debug this issue by inspecting aiomonitor and checking the existence of this task.  After this issue, I began to think we need a proper abstraction of a long-running task group (NOTE: the task group is long-running.  The lifetime of internal tasks does not matter).

Another case is that https://github.com/lablup/backend.ai/issues/330.

One of our customer site has suffered from excessive CPU usage by our program.  We could identify the issue by aiomonitor, and the root cause was the indefinite accumulation of peridoically created asyncio tasks to measure the disk usage of user directories, when there are too many files in them.  Since the number of tasks have exceeded 10K, it was very difficult to group and distinguish individual asyncio tasks in aiomonitor.  I thought that it would be nice if we could group such tasks into long-running groups and view task statistics separately.
msg413961 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 04:58
I ended up with the following conclusion:
- The new abstraction should not cancel sibling tasks and itself upon unhandled execption but loudly report such errors (and the fallback error handler should be customizable).
- Nesting task groups will give additional benefits such as orderly shutdown of different task groups.  Empty up message queues before shutting down netweork connections, etc.

You may take my suggestion as "let's have a hierarchical nested virtual event loops to group tasks".  PersistentTaskGroup actually shares many characteristics with the event loop while itself is not an event loop.

So I came up with WeakSet with task decorators to handle exceptions by my own, and this is the current rudimentary implementation of PersistentTaskGroup in aiotools.

And I discovered from the additional search results that the same pattern ---managing sporadic tasks using WeakSet and writing a proper cancellation loop of them---appear quite commonly in many different asyncio applications and libraries.

So that's why I think this should be an intrinsic/essential abstraction.
msg413962 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 05:04
Here is one another story.

When handling message queues in distributed applications, I use the following pattern frequently for graceful shutdown:
* Use a sentinel object to signal the end of queue.
* Enqueue the sentinel object when:
  - The server is shutting down. (i.e., cancelled explicitly)
  - The connection peer has sent an explicit termination message. (e.g., EOF)
* Wait until all enqueued messages before the sentinal object to be processed.
  - I'd like to impose a shutdown timeout on here using a persistent task group, by spawning all handler tasks of this queue into it.
msg413967 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 07:03
Anoter case:

https://github.com/lablup/backend.ai-manager/pull/533
https://github.com/lablup/backend.ai-agent/pull/341

When shutting down the application, I'd like to explicitly cancel the shielded tasks, while keep them shielded before shutdown.

So I inserted `ptaskgroup.create_task()` inside `asyncio.shield()`, so that the tasks are not cancelled upon the cancellation of their callers but they get cancelled when the server shuts down.

This pattern is conveniently implemented with PersistentTaskGroup.
msg414014 - (view) Author: Guido van Rossum (gvanrossum) * (Python committer) Date: 2022-02-25 16:34
The implementation of asyncio.TaskGroup isn't all that complicated (and the new "cancel count" API helps). I recommend that you build one that satisfies your requirements yourself, or convince the authors of some other package like Quattro or aiotools to provide variations.
msg414015 - (view) Author: Guido van Rossum (gvanrossum) * (Python committer) Date: 2022-02-25 16:34
(FWIW I would close this issue but I'll wait to see if @asvetlov has something to add.)
msg414026 - (view) Author: Tin Tvrtković (tinchester) * Date: 2022-02-25 17:46
The asyncio TaskGroup already uses a WeakSet for its children, so it's suitable for long-lived use.

As for errors in siblings aborting the TaskGroup, could you apply a wrapper to the scheduled coroutines to swallow and log any errors yourself?

Apart from the timeouts, that should get you a long way towards what you're describing.
msg414029 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 18:14
Good to hear that TaskGroup already uses WeakSet.

When all tasks finish, PersistentTaskGroup should not finish and wait for future tasks, unless explicitly cancelled or shutdown.  Could this be also configured with asyncio.TaskGroup?

I'm also ok with adding a simple option for such behavior to asyncio.TaskGroup instead of adding a whole new API/class.
msg414030 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 18:28
> As for errors in siblings aborting the TaskGroup, could you apply a wrapper to the scheduled coroutines to swallow and log any errors yourself?

Yes, this could be a simplest way to implement PersistentTaskGroup if TaskGroup supports "persistent" option to keep it running.

And just a question: I'm just curious about what happens if belonging tasks see the cancellation raised from their inner tasks.  Sibling tasks should not be cancelled, and the outer task group should not be cancelled, unless the task group itself has requested cancellation.  Could the new cancellation counter help this?
msg414034 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 19:08
> And just a question: I'm just curious about what happens if belonging tasks see the cancellation raised from their inner tasks.  Sibling tasks should not be cancelled, and the outer task group should not be cancelled, unless the task group itself has requested cancellation.  Could the new cancellation counter help this?

To achieve this by distinguishing cancellation from inner/outer tasks, TaskGroup._on_task_done() should be modified to skip setting _on_completed_fut because it should keep running.  Swallowing exceptions in child tasks can be done without modifying TaskGroup, but this part requires changes of TaskGroup.

Another difference is the usage.  Instead of relying on the async context manager interface, we would call "TaskGroup.shutdown()" separately from either directly in signal handlers or from cleanup methods of long-lived objects that have task groups as attributes.

And I also want to perform two-phase cancellation: instead of cancelling all tasks immediately as in current _abort(), have a configurable grace period until they have chances to complete and then cancel with additional timeout on cancellation itself to prevent hangs.
msg414037 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-25 19:38
Short summary:

PersistentTaskGroup shares the followings from TaskGroup:
- It uses WeakSet to keep track of child tasks.
- After exiting the async context manager scope (or the shutdown procedure), it ensures that all tasks are complete or cancelled.

PersistentTaskGroup differs in that:
- It keeps running after all tasks successfully finish unless it is explicitly shutdown or the parent task is cancelled.
- It is one of the main use cases that shutdown() method is called separately.  The shutdown procedure may be triggered from different task contexts.
- It provides two-phase cancellation with a configurable grace period.
- It does not propagate unhandled exceptions and cancellations from child tasks to the outside of the task group and sibling tasks but calls a customizable fallback exception handler. -> This could be done without modifying TaskGroup.

The API looks similar to TaskGroup with minor modification.
The semantics of a PersistentTaskGroup more resembles a nested event loop, in that it has its own set of tasks, it keeps running until closed, and it has its own fallback exception handler.

Note that current aiotools implementation lacks many details, such as two-phase cancellation.  I'm going to implement more soon.
msg414161 - (view) Author: Joongi Kim (achimnol) * Date: 2022-02-27 16:40
I have updated the PersistentTaskGroup implementation referring asyncio.TaskGroup and added more detailed test cases, which works with the latest Python 3.11 GitHub checkout.

https://github.com/achimnol/aiotools/pull/36/files

Please have a look at the class docstring.
There are two different usage: async context manager vs. attributes of long-lived objects.

One of the point is to "revive" asyncio.gather() with return_exceptions=True but let it handle/report exceptions immediately with customizable exception handler.

Currently two-phase shutdown is not implemented yet as I'm still thinking about how to adapt the current implementation.
msg414637 - (view) Author: Joongi Kim (achimnol) * Date: 2022-03-07 05:40
I have released the new version of aiotools with rewritten TaskGroup and PersistentTaskGroup.

https://aiotools.readthedocs.io/en/latest/aiotools.taskgroup.html

aiotools.TaskGroup has small additions to asyncio.TaskGroup: a naming API and `current_taskgroup` context variable.

aiotools.PersistentTaskGroup is what I've described here, highlighting both async-with usage and long-lived object usage and `all_ptaskgroups()` classmethod for the monitoring purpose except the two-phase graceful shutdown (future TODO).
msg415132 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-03-14 12:07
I think we should close the PR now.
I'm open to the discussion resurrection in Python 3.12 or 3.13, when aiotools implementation will be battle-tested.
msg415149 - (view) Author: Guido van Rossum (gvanrossum) * (Python committer) Date: 2022-03-14 15:42
Okay.
History
Date User Action Args
2022-04-11 14:59:56adminsetgithub: 90999
2022-03-14 15:42:46gvanrossumsetstatus: open -> closed

messages: + msg415149
stage: resolved
2022-03-14 12:07:12asvetlovsetmessages: + msg415132
2022-03-07 05:40:00achimnolsetmessages: + msg414637
2022-02-27 16:40:41achimnolsetmessages: + msg414161
2022-02-25 19:38:24achimnolsetmessages: + msg414037
2022-02-25 19:08:02achimnolsetmessages: + msg414034
2022-02-25 18:28:25achimnolsetmessages: + msg414030
2022-02-25 18:14:14achimnolsetmessages: + msg414029
2022-02-25 17:46:29tinchestersetmessages: + msg414026
2022-02-25 16:34:53gvanrossumsetmessages: + msg414015
2022-02-25 16:34:17gvanrossumsetnosy: + tinchester
messages: + msg414014
2022-02-25 07:03:06achimnolsetmessages: + msg413967
2022-02-25 05:04:59achimnolsetmessages: + msg413962
2022-02-25 04:58:36achimnolsetmessages: + msg413961
2022-02-25 04:45:00achimnolsetmessages: + msg413960
2022-02-25 04:05:18achimnolsetmessages: + msg413959
2022-02-24 16:45:33gvanrossumsetmessages: + msg413926
2022-02-24 15:50:11achimnolsetmessages: + msg413922
2022-02-24 14:29:02achimnolsetmessages: + msg413914
2022-02-24 14:18:45achimnolsetmessages: + msg413913
2022-02-24 13:33:16achimnolsetmessages: + msg413908
2022-02-24 13:17:54achimnolsetmessages: + msg413905
2022-02-24 05:34:25achimnolcreate