This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author maggyero
Recipients asvetlov, bquinlan, lukasz.langa, maggyero, methane, pablogsal, pitrou, serhiy.storchaka, willingc
Date 2019-06-26.08:03:42
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1561536223.59.0.366445484109.issue37276@roundup.psfhosted.org>
In-reply-to
Content
@Pablo Galindo Salgado

Thank you for the debugging information. I would have expected 8 "Adding a new item to the call_queue" instead of 3, since I submitted 8 calls to the process pool.

The concurrent.futures._base module defines 5 future states:

> _FUTURE_STATES = [
>     PENDING,
>     RUNNING,
>     CANCELLED,
>     CANCELLED_AND_NOTIFIED,
>     FINISHED
> ]

The concurrent.futures.process module explains the job flow:

> Local worker thread:
> - reads work ids from the "Work Ids" queue and looks up the corresponding
>   WorkItem from the "Work Items" dict: if the work item has been cancelled then
>   it is simply removed from the dict, otherwise it is repackaged as a
>   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
>   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
>   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
> - reads _ResultItems from "Result Q", updates the future stored in the
>   "Work Items" dict and deletes the dict entry

So for implementation reasons (allowing `Future.cancel()`), submitted calls are put in an intermediary unbounded pending dict instead of directly in the call queue (which should be the pending queue), and this queue is bounded instead of being unbounded. Note that is not the case for the concurrent.futures.thread module for which calls are directly submitted to an unbounded call queue (pending queue).

And for optimization reasons, the chosen size for this bounded call queue is `max_workers + 1` instead of `max_workers`, as defined here:

> # Create communication channels for the executor
> # Make the call queue slightly larger than the number of processes to
> # prevent the worker processes from idling. But don't make it too big
> # because futures in the call queue cannot be cancelled.
> queue_size = self._max_workers + EXTRA_QUEUED_CALLS
> self._call_queue = _SafeQueue(
>     max_size=queue_size, ctx=self._mp_context,
>     pending_work_items=self._pending_work_items)

and here:

> # Controls how many more calls than processes will be queued in the call queue.
> # A smaller number will mean that processes spend more time idle waiting for
> # work while a larger number will make Future.cancel() succeed less frequently
> # (Futures in the call queue cannot be cancelled).
> EXTRA_QUEUED_CALLS = 1

PENDING calls are put in the call queue until the queue is full:

>     while True:
>         if call_queue.full():
>             return
>         try:
>             work_id = work_ids.get(block=False)
>         except queue.Empty:
>             return
>         else:
>             work_item = pending_work_items[work_id]
> 
>             if work_item.future.set_running_or_notify_cancel():
>                 call_queue.put(_CallItem(work_id,
>                                          work_item.fn,
>                                          work_item.args,
>                                          work_item.kwargs),
>                                block=True)
>             else:
>                 del pending_work_items[work_id]
>                 continue

The state of the call is updated to RUNNING right *before* the call is put in the call queue by the method `Future.set_running_or_notify_cancel()` instead of when the call is consumed from the call queue by a worker process here:

>     while True:
>         call_item = call_queue.get(block=True)
>         if call_item is None:
>             # Wake up queue management thread
>             result_queue.put(os.getpid())
>             return
>         try:
>             r = call_item.fn(*call_item.args, **call_item.kwargs)
>         except BaseException as e:
>             exc = _ExceptionWithTraceback(e, e.__traceback__)
>             _sendback_result(result_queue, call_item.work_id, exception=exc)
>         else:
>             _sendback_result(result_queue, call_item.work_id, result=r)
> 
>         # Liberate the resource as soon as possible, to avoid holding onto
>         # open files or shared memory that is not needed anymore
>         del call_item

So when creating a process pool of 2 workers, a call queue of size 3 is created. When submitting 8 calls to the pool, all of them are put in a pending dict, then 3 of 8 of them are updated from the PENDING state to the RUNNING state and put in the call queue, then 2 of 3 are consumed by the 2 workers leaving the call queue with 2 empty places and finally 2 of 5 remaining calls in the pending dict are updated from the PENDING state to the RUNNING state and put in the call queue. So to my understanding, in the end the current implementation should show 5 RUNNING states (2 calls currently being executed by the 2 workers, 3 calls pending in the call queue).
On MacOS I get these 5 RUNNING states, but sometimes 4. On Windows always 3.
But this is wrong in all cases: the user expects 2, since RUNNING = "being executed by a worker process", not "being executed by a worker process or pending in the call queue".

When the number of submitted calls is less than or equal to the number of workers, everything is fine: the number of RUNNING calls is the number of calls currently being executed by a worker process (since with the current implementation all calls are put in the call queue and directly consumed by a worker process, since both the size of the queue and the number of worker processes are greater or equal to the number of submitted calls).

However when the number of submitted calls is strictly greater than the number of workers, the number of RUNNING calls makes no sense.
History
Date User Action Args
2019-06-26 08:03:43maggyerosetrecipients: + maggyero, bquinlan, pitrou, asvetlov, methane, lukasz.langa, serhiy.storchaka, willingc, pablogsal
2019-06-26 08:03:43maggyerosetmessageid: <1561536223.59.0.366445484109.issue37276@roundup.psfhosted.org>
2019-06-26 08:03:43maggyerolinkissue37276 messages
2019-06-26 08:03:42maggyerocreate