classification
Title: Incorrect number of running calls in ProcessPoolExecutor
Type: behavior Stage:
Components: Library (Lib) Versions: Python 3.7
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, bquinlan, inada.naoki, lukasz.langa, maggyero, pablogsal, pitrou, serhiy.storchaka, willingc
Priority: normal Keywords:

Created on 2019-06-14 06:08 by maggyero, last changed 2019-06-26 12:26 by asvetlov.

Messages (11)
msg345553 - (view) Author: Géry (maggyero) * Date: 2019-06-14 06:08
In the `concurrent.futures` standard module, the number of running calls in a `ProcessPoolExecutor` is `max_workers + 1` (unexpected) instead of `max_workers` (expected) like in a `ThreadingPoolExecutor`.

The following code snippet which submits 8 calls to 2 workers in a `ProcessPoolExecutor`:

    import concurrent.futures
    import time
    
    
    def call():
        while True:
            time.sleep(1)
    
    
    if __name__ == "__main__":
        with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
            futures = [executor.submit(call) for _ in range(8)]
    
            for future in futures:
                print(future.running())

prints this (3 running calls; unexpected since there are 2 workers):

> True
> True
> True
> False
> False
> False
> False
> False

while using a `ThreadPoolExecutor` prints this (2 running calls; expected):

> True
> True
> False
> False
> False
> False
> False
> False

Tested on both Windows 10 and MacOS 10.14.
msg346529 - (view) Author: Géry (maggyero) * Date: 2019-06-25 14:32
Initial post: https://stackoverflow.com/q/56587166/2326961
msg346538 - (view) Author: Ned Deily (ned.deily) * (Python committer) Date: 2019-06-25 15:47
@maggyero, Please do not spam a list of people by add their names to issues; it will not speed a resolution.  Let the people doing bug triage evaluate the issue and, if necessary, nosy the appropriate developers.
msg346544 - (view) Author: Carol Willing (willingc) * (Python committer) Date: 2019-06-25 18:09
I've run the code snippet several times on Mac 10.14.5 with Python 3.7.3. I'm not able to replicate your result for the `ProcessPoolExecutor` but can replicate results for `ThreadPoolExecutor`. Do you have another example where you are seeing this behavior?
msg346548 - (view) Author: Géry (maggyero) * Date: 2019-06-25 18:57
@Ned Deily

Okay, I did not know if I had to list the potentially interested people (according to their Github contribution on the module for instance) or let them do it themselves. Thank you for clarifying.

@Carol Willing

The number of RUNNING futures is always `max_workers + 1` for `ProcessPoolExecutor` but only on Windows (CPython 3.7, Windows 10). On MacOS, this number varies, depending on the time you wait before calling `print(future.running())`.
So to reproduce you could add the expression `time.sleep(n)` right after  the statement `futures = [executor.submit(call) for _ in range(8)]` and see if the number of RUNNING futures varies with n.
msg346565 - (view) Author: Pablo Galindo Salgado (pablogsal) * (Python committer) Date: 2019-06-25 22:31
All the pending work items are added to a call queue that the processes consume (_add_call_item_to_queue) and the status is set before adding it to said queue (by calling set_running_or_notify_cancel()). Notice that the fact that future.running() == True does not mean that a worker has picked up the work item. The worker function (_process_worker for the ProcessPoolExecutor) gets items for this queue and then sends back results in another queue (result queue). If you add extra debugging you will see that only two items are dequeued from the call_queue:


--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -231,6 +231,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
             return
     while True:
         call_item = call_queue.get(block=True)
+        print("Worker: Getting a new item")
         if call_item is None:
             # Wake up queue management thread
             result_queue.put(os.getpid())
@@ -277,6 +278,7 @@ def _add_call_item_to_queue(pending_work_items,
             work_item = pending_work_items[work_id]
 
             if work_item.future.set_running_or_notify_cancel():
+                print("Adding a new item to the call_queue")
                 call_queue.put(_CallItem(work_id,
                                          work_item.fn,
                                          work_item.args,
(END)


Executing your script:

import time, concurrent.futures

def call():
    while True:
        time.sleep(1)


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(call) for _ in range(8)]
        time.sleep(2)

        for future in futures:
            print(future.running())

Prints:

Adding a new item to the call_queue
Adding a new item to the call_queue
Adding a new item to the call_queue
Worker: Getting a new item
Worker: Getting a new item
True
True
True
False
False
False
False
False
msg346591 - (view) Author: Géry (maggyero) * Date: 2019-06-26 08:03
@Pablo Galindo Salgado

Thank you for the debugging information. I would have expected 8 "Adding a new item to the call_queue" instead of 3, since I submitted 8 calls to the process pool.

The concurrent.futures._base module defines 5 future states:

> _FUTURE_STATES = [
>     PENDING,
>     RUNNING,
>     CANCELLED,
>     CANCELLED_AND_NOTIFIED,
>     FINISHED
> ]

The concurrent.futures.process module explains the job flow:

> Local worker thread:
> - reads work ids from the "Work Ids" queue and looks up the corresponding
>   WorkItem from the "Work Items" dict: if the work item has been cancelled then
>   it is simply removed from the dict, otherwise it is repackaged as a
>   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
>   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
>   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
> - reads _ResultItems from "Result Q", updates the future stored in the
>   "Work Items" dict and deletes the dict entry

So for implementation reasons (allowing `Future.cancel()`), submitted calls are put in an intermediary unbounded pending dict instead of directly in the call queue (which should be the pending queue), and this queue is bounded instead of being unbounded. Note that is not the case for the concurrent.futures.thread module for which calls are directly submitted to an unbounded call queue (pending queue).

And for optimization reasons, the chosen size for this bounded call queue is `max_workers + 1` instead of `max_workers`, as defined here:

> # Create communication channels for the executor
> # Make the call queue slightly larger than the number of processes to
> # prevent the worker processes from idling. But don't make it too big
> # because futures in the call queue cannot be cancelled.
> queue_size = self._max_workers + EXTRA_QUEUED_CALLS
> self._call_queue = _SafeQueue(
>     max_size=queue_size, ctx=self._mp_context,
>     pending_work_items=self._pending_work_items)

and here:

> # Controls how many more calls than processes will be queued in the call queue.
> # A smaller number will mean that processes spend more time idle waiting for
> # work while a larger number will make Future.cancel() succeed less frequently
> # (Futures in the call queue cannot be cancelled).
> EXTRA_QUEUED_CALLS = 1

PENDING calls are put in the call queue until the queue is full:

>     while True:
>         if call_queue.full():
>             return
>         try:
>             work_id = work_ids.get(block=False)
>         except queue.Empty:
>             return
>         else:
>             work_item = pending_work_items[work_id]
> 
>             if work_item.future.set_running_or_notify_cancel():
>                 call_queue.put(_CallItem(work_id,
>                                          work_item.fn,
>                                          work_item.args,
>                                          work_item.kwargs),
>                                block=True)
>             else:
>                 del pending_work_items[work_id]
>                 continue

The state of the call is updated to RUNNING right *before* the call is put in the call queue by the method `Future.set_running_or_notify_cancel()` instead of when the call is consumed from the call queue by a worker process here:

>     while True:
>         call_item = call_queue.get(block=True)
>         if call_item is None:
>             # Wake up queue management thread
>             result_queue.put(os.getpid())
>             return
>         try:
>             r = call_item.fn(*call_item.args, **call_item.kwargs)
>         except BaseException as e:
>             exc = _ExceptionWithTraceback(e, e.__traceback__)
>             _sendback_result(result_queue, call_item.work_id, exception=exc)
>         else:
>             _sendback_result(result_queue, call_item.work_id, result=r)
> 
>         # Liberate the resource as soon as possible, to avoid holding onto
>         # open files or shared memory that is not needed anymore
>         del call_item

So when creating a process pool of 2 workers, a call queue of size 3 is created. When submitting 8 calls to the pool, all of them are put in a pending dict, then 3 of 8 of them are updated from the PENDING state to the RUNNING state and put in the call queue, then 2 of 3 are consumed by the 2 workers leaving the call queue with 2 empty places and finally 2 of 5 remaining calls in the pending dict are updated from the PENDING state to the RUNNING state and put in the call queue. So to my understanding, in the end the current implementation should show 5 RUNNING states (2 calls currently being executed by the 2 workers, 3 calls pending in the call queue).
On MacOS I get these 5 RUNNING states, but sometimes 4. On Windows always 3.
But this is wrong in all cases: the user expects 2, since RUNNING = "being executed by a worker process", not "being executed by a worker process or pending in the call queue".

When the number of submitted calls is less than or equal to the number of workers, everything is fine: the number of RUNNING calls is the number of calls currently being executed by a worker process (since with the current implementation all calls are put in the call queue and directly consumed by a worker process, since both the size of the queue and the number of worker processes are greater or equal to the number of submitted calls).

However when the number of submitted calls is strictly greater than the number of workers, the number of RUNNING calls makes no sense.
msg346594 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-06-26 09:12
Adding a new state for "not running and not pending but something in between" is useless, it can make .running() result a little more accurate but doesn't improve the real functionality.

The easy "fix" is documentation updating to point that the value returned by running is an approximation. That's true anyway because the future may change its state between reading self._state and returning a result.
msg346596 - (view) Author: Pablo Galindo Salgado (pablogsal) * (Python committer) Date: 2019-06-26 09:53
I concur with Andrew
msg346610 - (view) Author: Géry (maggyero) * Date: 2019-06-26 12:21
@Andrew Svetlov

> Adding a new state for "not running and not pending but something in between" is useless

I have not suggested that. I have just reported that when the number of submitted calls is strictly greater than the number of pool worker processes, the number of RUNNING calls returned by the method `Future.running()` makes no sense. Probably because the current `ProcessPoolExecutor` implementation uses 2 PENDING stores (the `_pending_work_items` `dict` and the `call_queue` `multiprocessing.Queue`) but treats the second store as a RUNNING store, contrary to the `ThreadPoolExecutor` implementation which has only 1 PENDING store (the `_work_queue` `queue.SimpleQueue`). The proper thing to do would be to correct the current implementation, not to create a new future state of course.
msg346611 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-06-26 12:26
I don't mind, sorry.

Feel free to make a pull request with the fix though.
History
Date User Action Args
2019-06-26 12:26:02asvetlovsetmessages: + msg346611
2019-06-26 12:21:49maggyerosetmessages: + msg346610
2019-06-26 09:53:14pablogsalsetmessages: + msg346596
2019-06-26 09:12:24asvetlovsetmessages: + msg346594
2019-06-26 08:03:43maggyerosetmessages: + msg346591
2019-06-25 22:31:23pablogsalsetnosy: + pablogsal
messages: + msg346565
2019-06-25 18:57:21maggyerosetmessages: + msg346548
2019-06-25 18:09:56willingcsetnosy: + willingc
messages: + msg346544
2019-06-25 15:47:54ned.deilysetnosy: - ned.deily
2019-06-25 15:47:00ned.deilysetmessages: + msg346538
2019-06-25 14:32:07maggyerosetmessages: + msg346529
2019-06-14 09:51:13vstinnersetnosy: - vstinner
2019-06-14 06:08:05maggyerocreate