This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author pablogsal
Recipients asvetlov, bquinlan, lukasz.langa, maggyero, methane, pablogsal, pitrou, serhiy.storchaka, willingc
Date 2019-06-25.22:31:23
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1561501883.41.0.287306764813.issue37276@roundup.psfhosted.org>
In-reply-to
Content
All the pending work items are added to a call queue that the processes consume (_add_call_item_to_queue) and the status is set before adding it to said queue (by calling set_running_or_notify_cancel()). Notice that the fact that future.running() == True does not mean that a worker has picked up the work item. The worker function (_process_worker for the ProcessPoolExecutor) gets items for this queue and then sends back results in another queue (result queue). If you add extra debugging you will see that only two items are dequeued from the call_queue:


--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -231,6 +231,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
             return
     while True:
         call_item = call_queue.get(block=True)
+        print("Worker: Getting a new item")
         if call_item is None:
             # Wake up queue management thread
             result_queue.put(os.getpid())
@@ -277,6 +278,7 @@ def _add_call_item_to_queue(pending_work_items,
             work_item = pending_work_items[work_id]
 
             if work_item.future.set_running_or_notify_cancel():
+                print("Adding a new item to the call_queue")
                 call_queue.put(_CallItem(work_id,
                                          work_item.fn,
                                          work_item.args,
(END)


Executing your script:

import time, concurrent.futures

def call():
    while True:
        time.sleep(1)


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(call) for _ in range(8)]
        time.sleep(2)

        for future in futures:
            print(future.running())

Prints:

Adding a new item to the call_queue
Adding a new item to the call_queue
Adding a new item to the call_queue
Worker: Getting a new item
Worker: Getting a new item
True
True
True
False
False
False
False
False
History
Date User Action Args
2019-06-25 22:31:23pablogsalsetrecipients: + pablogsal, bquinlan, pitrou, asvetlov, methane, lukasz.langa, serhiy.storchaka, willingc, maggyero
2019-06-25 22:31:23pablogsalsetmessageid: <1561501883.41.0.287306764813.issue37276@roundup.psfhosted.org>
2019-06-25 22:31:23pablogsallinkissue37276 messages
2019-06-25 22:31:23pablogsalcreate