Title: ProcessPoolExecutor/ThreadPoolExecutor should provide introspection APIs
Type: enhancement Stage: patch review
Components: Library (Lib) Versions: Python 3.7
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Claudiu.Popa, Winterflower, cool-RR, dan.oreilly, pitrou
Priority: normal Keywords: patch

Created on 2014-08-27 01:56 by dan.oreilly, last changed 2017-11-02 23:56 by tomMoral.

File name Uploaded Description Edit
introspect_executors.diff dan.oreilly, 2014-08-27 01:56 Add introspection APIs to Executors review dan.oreilly, 2014-08-27 02:00 Simple test script demonstrating the APIs
Pull Requests
URL Status Linked Edit
PR 4243 open tomMoral, 2017-11-02 23:56
Messages (4)
msg225941 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2014-08-27 01:56
As initially discussed on python-ideas, it would be nice if there was a way to query the concurrent.futures Executor objects for information about their internal state - number of total/active/idle workers, number of total/active/waiting tasks, which tasks are active, which are waiting, etc. Some of this information can be determined today by examining the Executor's internal variables, but not all.

I'm attaching a patch that makes a first attempt at adding this support. Currently it adds quite a few methods, though these could be consolidated somewhat if that's preferable. Here's what's I've added, along with possible consolidations:

worker_count() : Total number of workers currently in the pool
active_worker_count() : Number of workers currently processing a work item
idle_worker_count(): Number of workers not processing a work item
(Possible consolidation: worker_counts(): returns a dict containing total/active/idle keys mapped to the above.)

task_count(): Total number of tasks currently being handled by the pool
active_task_count(): Number of tasks currently being processed by workers (Possibly redundant - it will always match active_worker_count())
waiting_task_count(): Number of submitted tasks not yet being processed by a worker
(Possible consolidation: task_counts(): returns a dict containing total/active/waiting keys mapped to the above.)

active_tasks(): A set of WorkItem objects currently being processed by a worker.
waiting_tasks(): A list of WorkItem objects currently waiting to be processed by a worker.
(Possible consolidation: get_tasks(): returns a dict containing active/waiting keys mapped to the above.)

A WorkItem is an object containing the function object, args tuple, and kwargs dict submitted to the Executor.

ThreadPoolExecutor notes:

For ThreadPoolExecutor, most of this information is made available by changing the worker threads from functions into class instances, and maintaining a small amount of extra state on the instance. The added overhead for users who don't care about using introspection should be very minimal. Note that for waiting_tasks(), we have to introspect the underlying queue.Queue. This is done by locking the queue's internal mutex, and iterating over the queue's internal deque. There was some concern about doing this on the mailing list, so I wanted to call it out. We could alternately implement waiting_tasks by maintaining some data structure (a deque?) of work items that are enqueued in parallel to the actual queue. However, this adds additional memory overhead and implementation complexity (to keep the new deque in sync with the queue's content).

ProcessPoolExecutor notes:

ProcessPoolExecutor uses both a dict and a multiprocessing.Queue internally. Every submitted work item is placed into the dict (which is called _pending_work_items), keyed on a unique work_id. However, only WORKER_COUNT + 1 items are actually placed into the multiprocessing.Queue at a time. This, along with the added IPC complexity and cost, makes the implementation approach a bit different from ThreadPoolExecutor. 

Every method except worker_count() and task_count() require changes in the worker implementation - it now has to send the work_id of the work item it's about to process back to the parent. It does this via a multiprocessing.SimpleQueue that's already being used to send the result of the work item to the parent. The parent process will then store that work_id in a set called _active_work_items. When the actual result of a work item is sent back to the parent, the work_id (which is already included with the result) is removed from the _active_work_items set.

The active_tasks() method can build its set by iterating over work_ids in the _active_tasks set, and looking up the corresponding WorkItem in the _pending_work_items dict. waiting_tasks() can iterate over the _pending_tasks dict and build a list containing any item that isn't present in the _active_tasks set. That list is then sorted by work_id for it to reflect the actual order that the tasks will be placed into the queue. The main source of added overhead for non-users of introspection is the cost of sending the work_id back to the parent process prior to actually processing a WorkItem in the child, along with the small amount of memory used to store the _active_tasks set (which will never be greater than MAX_WORKERS in size). In practice I don't think this will have much noticeable performance impact, except perhaps for cases where there are many tasks which execute very quickly.

Also note that right now there are no docs included in the patch. I want some consensus on the API to be reached prior to writing them.
msg225952 - (view) Author: Ram Rachum (cool-RR) * Date: 2014-08-27 09:46
I'd definitely consolidate.

First of all, I'd put a few useful numbers in `Executor.__repr__`. Something like <ThreadPoolExecutor(7), 3 workers busy, 0 work items queued>. That already makes to easy to get a general picture of how the executor is doing without digging in too deeply.

Next, I'd make a property `workers` (or if you want to make it a method, that's okay). It'll return a list of all the workers, and in their `__repr__` they'll have the work item that they're working on. Maybe we could also add `idle_workers` and `busy_workers`, then people can call `len` on them. (No point in making a function that tells their length if it doesn't expose them directly.) Also "active" may not be a clear term, I think that "busy" communicates it better, and is shorter to boot.

I'd also add `queued_work_items` and `active_work_items`, and I think that takes care of everything.
msg301169 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-09-02 18:21
Dan, not sure you're still interested on working on this?
Looking at the API, IMHO it should be consolidated as a simple stats() method, which allows for a consistent snapshot.
msg301172 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2017-09-02 19:51
Unfortunately, I don't really have time to continue working on this anymore (or any of the other proposed patches I had submitted around 2014).
Date User Action Args
2017-11-02 23:56:22tomMoralsetstage: needs patch -> patch review
pull_requests: + pull_request4207
2017-09-02 19:51:23dan.oreillysetmessages: + msg301172
2017-09-02 18:21:36pitrousetversions: + Python 3.7, - Python 3.5
nosy: + pitrou

messages: + msg301169

stage: needs patch
2017-02-12 20:37:34Winterflowersetnosy: + Winterflower
2014-08-27 09:46:38cool-RRsetnosy: + cool-RR
messages: + msg225952
2014-08-27 04:59:52Claudiu.Popasetnosy: + Claudiu.Popa
2014-08-27 02:00:24dan.oreillysetfiles: +
2014-08-27 01:56:42dan.oreillycreate