classification
Title: Ability to adjust queue size in Executors
Type: enhancement Stage: patch review
Components: Library (Lib) Versions: Python 3.3
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: bquinlan Nosy List: Nam.Nguyen, Patrik Dufresne, Victor.Varvariuc, Vinay Anantharaman, Winterflower, bquinlan, mhrivnak, r.david.murray, thehesiod
Priority: normal Keywords: patch

Created on 2012-02-25 00:19 by Nam.Nguyen, last changed 2017-11-08 15:47 by mhrivnak.

Files
File name Uploaded Description Edit
executor-queue-size.diff Nam.Nguyen, 2012-02-27 23:01 patch to add queue size to ThreadPoolExecutor review
Messages (18)
msg154175 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012-02-25 00:19
I am running into a memory consumption issue with concurrent.futures module. Its Executors do not have a public API to adjust their queue size.  and the queues are created unbounded initially.

It would be helpful to have some public method or a parameter at construction time to limit this queue size.
msg154177 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012-02-25 00:44
By the way, ProcessPoolExecutor actually sets its queue size to a reasonable number but ThreadPoolExecutor does not.
msg162599 - (view) Author: R. David Murray (r.david.murray) * (Python committer) Date: 2012-06-11 00:50
Brian: ping.  Since this is an enhancement, if you are going to accept it it would be nice to get it into 3.3, which means committing it before June 23rd.
msg162601 - (view) Author: Brian Quinlan (bquinlan) (Python committer) Date: 2012-06-11 02:55
Hey Nam,

I'm not sure that I understand. You want ThreadPoolExecutor.submit to block if there are too many work items in the queue? Are you sure that this happens currently with ProcessPoolExecutor? I can't see why it would.
msg162604 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012-06-11 03:56
Currently, ProcessionPoolExecutor has this line in its constructor:

self._call_queue = multiprocessing.Queue(self._max_workers +
                                                 EXTRA_QUEUED_CALLS)

where EXTRA_QUEUED_CALLS is 1.

And yes, it would be best to export a method so that the app developer can set the queue size for themselves. In my case, I would want to limit the queue so that I dont run out of memory. Others might not want the queue to block, and hence would prefer an unlimited queue.
msg162605 - (view) Author: Brian Quinlan (bquinlan) (Python committer) Date: 2012-06-11 04:34
The queue that you identified i.e.

self._call_queue = multiprocessing.Queue(self._max_workers +
                                         EXTRA_QUEUED_CALLS)

does not get considered during submit() - are you sure that it somehow causes submit() to block.

Could you explain what your use case is such that you can run out of memory?
msg162606 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012-06-11 04:46
I used the ThreadPoolExecutor to process a large number (bounded) of input/output jobs. Because there were too many of them and the worker threads could not process them fast enough to drain them from the queue, the queue kept increasing in size.

It was okay for the script, though, to block, waiting for the queue to drain, before submitting new jobs. So I needed to limit the queue size.
msg162664 - (view) Author: Brian Quinlan (bquinlan) (Python committer) Date: 2012-06-12 09:00
I've had people request that they be able control the order of processed work submissions. So a more general way to solve your problem might be to make the two executors take an optional Queue argument in their constructors.

You'd have to explain in detail in the document how the queues are used.

What do you think?
msg162700 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012-06-13 02:50
+1

That was actually what I did. I replaced the internal queue with another one whose limit was properly set.

If you are busy to write one, let me find some time to create another patch.
msg207512 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014-01-07 08:42
Maybe I should have created another issue for this, but without this issue being solved, the new issue will not help much.
Here it is:
http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l63
After running an work item `work_queue.task_done()` is not called.
So it's difficult to know if worker threads have any more work to do.
http://stackoverflow.com/questions/20965754/determine-if-worker-threads-are-doing-any-work?noredirect=1#comment31495804_20965754
msg207602 - (view) Author: Brian Quinlan (bquinlan) (Python committer) Date: 2014-01-07 21:06
Hi Victor,

I don't understand your problem. Could you be very specific in your description?
msg207672 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014-01-08 05:53
Hi Brian,

In one my projects I had to monkey-patch module `concurrent.futures.thread:60`( http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l60) with:

def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                work_queue.task_done()  # <-- added this line
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if futures_thread._shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        futures_thread._base.LOGGER.critical('Exception in worker', exc_info=True)

This helps me to control the state of the work queue -- I can see if there are any work items still being processed:

if executor._work_queue.unfinished_tasks:
    # executor is still producing something
    ...
msg207893 - (view) Author: Brian Quinlan (bquinlan) (Python committer) Date: 2014-01-11 00:43
Can't you accomplish what you want using add_done_callback?

e.g.

# Pseudocode
class MyExecutor(ThreadPoolExecutor):
  def __init__(self):
    self._count = 0

  def _decrement(self):
    with self._some_lock:
      self._count -= 1

  def submit(self, fn, *args, **kwargs):
    f = super(self).submit(fn, *args, **kwargs)
    with self._some_lock:
      self._count += 1
    f.add_done_callback(self._decrement)

  @property
  def num_pending_futures(self):
    return self._count
msg207896 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014-01-11 06:16
Hi!

Looks like your pseudocode will work as a workaround instead of monkey-patching!

Still the my suggestion to add the line to code stays.
self._count should be always equal to the length of self._work_queue? If yes, why duplication. If no - which one to use, why duplication? Also there is an additional lock.

http://docs.python.org/3.3/library/queue.html#queue.Queue.task_done - there is a special method, why not using it?

Looks like you think that `work_queue.task_done()` should not be added. I don't understand why, but you decide what's better for Python.

Thank you for your time!
Victor
msg253895 - (view) Author: Alexander Mohr (thehesiod) * Date: 2015-11-02 06:58
adding support for internal queue size is critical to avoid chewing through all your memory when you have a LOT of tasks.  I just hit this issue myself.  If we could have a simple parameter to set the max queue size this would help tremendously!
msg274588 - (view) Author: Patrik Dufresne (Patrik Dufresne) Date: 2016-09-06 18:19
Any update on this subject ?

Also had to monkey patch the implementation to avoid consuming all the system memory.
msg280727 - (view) Author: Vinay Anantharaman (Vinay Anantharaman) Date: 2016-11-14 03:10
I did a code reading myself and I noticed that task_done is not called as well. Is there a reason?
msg305857 - (view) Author: Michael Hrivnak (mhrivnak) Date: 2017-11-08 15:47
My project also has a use case for this, very similar to the others described. Here's what we want:

with ThreadPoolExecutor(queue_size=500) as executor:
  for item in parse_a_long_list_of_work(somefile.xml):
    executor.submit(Job(item))

I do not want to parse the entire list of work items and load them into memory at once. It is preferable for the main thread running the above code to block on submit() when the queue size is above some threshold.

It's a classic case of the producer and consumer operating at different speeds. In the past, a Queue object has been the way to connect such a producer and consumer. The various Executor classes do not provide an easy way to consume from a provided Queue object, so giving them that capability would be a reasonable alternative to having the submit() method block.
History
Date User Action Args
2017-11-08 15:47:47mhrivnaksetnosy: + mhrivnak
messages: + msg305857
2017-02-12 20:30:05Winterflowersetnosy: + Winterflower
2016-11-14 03:10:51Vinay Anantharamansetnosy: + Vinay Anantharaman
messages: + msg280727
2016-09-06 18:19:30Patrik Dufresnesetnosy: + Patrik Dufresne
messages: + msg274588
2015-11-02 06:58:55thehesiodsetnosy: + thehesiod
messages: + msg253895
2014-01-11 06:16:57Victor.Varvariucsetmessages: + msg207896
2014-01-11 00:43:38bquinlansetmessages: + msg207893
2014-01-08 05:53:45Victor.Varvariucsetmessages: + msg207672
2014-01-07 21:06:53bquinlansetmessages: + msg207602
2014-01-07 08:42:46Victor.Varvariucsetnosy: + Victor.Varvariuc
messages: + msg207512
2012-06-13 02:50:39Nam.Nguyensetmessages: + msg162700
2012-06-12 09:00:14bquinlansetmessages: + msg162664
2012-06-11 04:46:07Nam.Nguyensetmessages: + msg162606
2012-06-11 04:34:51bquinlansetmessages: + msg162605
2012-06-11 03:56:35Nam.Nguyensetmessages: + msg162604
2012-06-11 02:55:12bquinlansetmessages: + msg162601
2012-06-11 00:50:37r.david.murraysetversions: + Python 3.3
nosy: + r.david.murray

messages: + msg162599

stage: patch review
2012-03-07 20:11:59bquinlansetassignee: bquinlan

nosy: + bquinlan
2012-02-27 23:01:54Nam.Nguyensetfiles: + executor-queue-size.diff
keywords: + patch
2012-02-25 00:44:40Nam.Nguyensetmessages: + msg154177
2012-02-25 00:19:01Nam.Nguyencreate