This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Add a possibility for asyncio.Condition to determine the count of currently waiting consumers
Type: enhancement Stage: resolved
Components: asyncio Versions: Python 3.11, Python 3.10, Python 3.9, Python 3.8, Python 3.7, Python 3.6
process
Status: closed Resolution: rejected
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, mykola-mokhnach, yselivanov
Priority: normal Keywords:

Created on 2021-09-09 04:46 by mykola-mokhnach, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Messages (3)
msg401434 - (view) Author: Mykola Mokhnach (mykola-mokhnach) Date: 2021-09-09 04:46
Currently the asyncio.Condition class does not provide any properties that would allow to determine how many (if) consumers are waiting for the current condition. My scenario is the following:

```
...
FILE_STATS_CONDITIONS: Dict[str, asyncio.Condition] = {}
FILE_STATS_CACHE: Dict[str, Union[FileStat, Exception]] = {}

...

    async def _fetch_file_stat(self: T, file_id: str) -> FileStat:
        """
        The idea behind this caching is to avoid sending multiple info requests
        for the same file id to the server and thus to avoid throttling. This is safe, because
        stored files are immutable.
        """
        try:
            file_stat: FileStat
            if file_id in FILE_STATS_CONDITIONS:
                self.log.debug(f'Reusing the previously cached stat request for the file {file_id}')
                async with FILE_STATS_CONDITIONS[file_id]:
                    await FILE_STATS_CONDITIONS[file_id].wait()
                    cached_result = FILE_STATS_CACHE[file_id]
                    if isinstance(cached_result, FileStat):
                        file_stat = cached_result
                    else:
                        raise cached_result
            else:
                FILE_STATS_CONDITIONS[file_id] = asyncio.Condition()
                cancellation_exception: Optional[asyncio.CancelledError] = None
                async with FILE_STATS_CONDITIONS[file_id]:
                    file_stat_task = asyncio.create_task(self.storage_client.get_file_stat(file_id))
                    try:
                        try:
                            file_stat = await asyncio.shield(file_stat_task)
                        except asyncio.CancelledError as e:
                            if len(getattr(FILE_STATS_CONDITIONS[file_id], '_waiters', (None,))) == 0:
                                # it is safe to cancel now if there are no consumers in the waiting queue
                                file_stat_task.cancel()
                                raise
                            # we don't want currently waiting consumers to fail because of the task cancellation
                            file_stat = await file_stat_task
                            cancellation_exception = e
                        FILE_STATS_CACHE[file_id] = file_stat
                    except Exception as e:
                        FILE_STATS_CACHE[file_id] = e
                        raise
                    finally:
                        FILE_STATS_CONDITIONS[file_id].notify_all()
                if cancellation_exception is not None:
                    raise cancellation_exception
            # noinspection PyUnboundLocalVariable
            self.log.info(f'File stat: {file_stat}')
            return file_stat
        except ObjectNotFoundError:
            self.log.info(f'The file identified by "{file_id}" either does not exist or has expired')
            raise file_not_found_error(file_id)
        finally:
            if file_id in FILE_STATS_CONDITIONS and not FILE_STATS_CONDITIONS[file_id].locked():
                del FILE_STATS_CONDITIONS[file_id]
                if file_id in FILE_STATS_CACHE:
                    del FILE_STATS_CACHE[file_id]
```

Basically I need to use `getattr(FILE_STATS_CONDITIONS[file_id], '_waiters', (None,))` to workaround this limitation in order to figure out whether to cancel my producer now or later.

It would be nice to have a public property on the Condition class, which would basically return the value of `len(condition._waiters)`.
msg401444 - (view) Author: Mykola Mokhnach (mykola-mokhnach) Date: 2021-09-09 08:13
This improvement request is also applicable to other asyncio synchronization primitives that contain a waiting queue.
msg413748 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-02-22 21:34
Sorry, your use-case looks not convincing but overcomplicated.
History
Date User Action Args
2022-04-11 14:59:49adminsetgithub: 89309
2022-02-22 21:34:56asvetlovsetstatus: open -> closed
resolution: rejected
messages: + msg413748

stage: resolved
2021-09-09 08:13:52mykola-mokhnachsetmessages: + msg401444
2021-09-09 04:46:16mykola-mokhnachcreate