msg154175 - (view) |
Author: Nam Nguyen (Nam.Nguyen) * |
Date: 2012-02-25 00:19 |
I am running into a memory consumption issue with concurrent.futures module. Its Executors do not have a public API to adjust their queue size. and the queues are created unbounded initially.
It would be helpful to have some public method or a parameter at construction time to limit this queue size.
|
msg154177 - (view) |
Author: Nam Nguyen (Nam.Nguyen) * |
Date: 2012-02-25 00:44 |
By the way, ProcessPoolExecutor actually sets its queue size to a reasonable number but ThreadPoolExecutor does not.
|
msg162599 - (view) |
Author: R. David Murray (r.david.murray) * |
Date: 2012-06-11 00:50 |
Brian: ping. Since this is an enhancement, if you are going to accept it it would be nice to get it into 3.3, which means committing it before June 23rd.
|
msg162601 - (view) |
Author: Brian Quinlan (bquinlan) * |
Date: 2012-06-11 02:55 |
Hey Nam,
I'm not sure that I understand. You want ThreadPoolExecutor.submit to block if there are too many work items in the queue? Are you sure that this happens currently with ProcessPoolExecutor? I can't see why it would.
|
msg162604 - (view) |
Author: Nam Nguyen (Nam.Nguyen) * |
Date: 2012-06-11 03:56 |
Currently, ProcessionPoolExecutor has this line in its constructor:
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
where EXTRA_QUEUED_CALLS is 1.
And yes, it would be best to export a method so that the app developer can set the queue size for themselves. In my case, I would want to limit the queue so that I dont run out of memory. Others might not want the queue to block, and hence would prefer an unlimited queue.
|
msg162605 - (view) |
Author: Brian Quinlan (bquinlan) * |
Date: 2012-06-11 04:34 |
The queue that you identified i.e.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
does not get considered during submit() - are you sure that it somehow causes submit() to block.
Could you explain what your use case is such that you can run out of memory?
|
msg162606 - (view) |
Author: Nam Nguyen (Nam.Nguyen) * |
Date: 2012-06-11 04:46 |
I used the ThreadPoolExecutor to process a large number (bounded) of input/output jobs. Because there were too many of them and the worker threads could not process them fast enough to drain them from the queue, the queue kept increasing in size.
It was okay for the script, though, to block, waiting for the queue to drain, before submitting new jobs. So I needed to limit the queue size.
|
msg162664 - (view) |
Author: Brian Quinlan (bquinlan) * |
Date: 2012-06-12 09:00 |
I've had people request that they be able control the order of processed work submissions. So a more general way to solve your problem might be to make the two executors take an optional Queue argument in their constructors.
You'd have to explain in detail in the document how the queues are used.
What do you think?
|
msg162700 - (view) |
Author: Nam Nguyen (Nam.Nguyen) * |
Date: 2012-06-13 02:50 |
+1
That was actually what I did. I replaced the internal queue with another one whose limit was properly set.
If you are busy to write one, let me find some time to create another patch.
|
msg207512 - (view) |
Author: Victor Varvariuc (Victor.Varvariuc) |
Date: 2014-01-07 08:42 |
Maybe I should have created another issue for this, but without this issue being solved, the new issue will not help much.
Here it is:
http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l63
After running an work item `work_queue.task_done()` is not called.
So it's difficult to know if worker threads have any more work to do.
http://stackoverflow.com/questions/20965754/determine-if-worker-threads-are-doing-any-work?noredirect=1#comment31495804_20965754
|
msg207602 - (view) |
Author: Brian Quinlan (bquinlan) * |
Date: 2014-01-07 21:06 |
Hi Victor,
I don't understand your problem. Could you be very specific in your description?
|
msg207672 - (view) |
Author: Victor Varvariuc (Victor.Varvariuc) |
Date: 2014-01-08 05:53 |
Hi Brian,
In one my projects I had to monkey-patch module `concurrent.futures.thread:60`( http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l60) with:
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
work_queue.task_done() # <-- added this line
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if futures_thread._shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException:
futures_thread._base.LOGGER.critical('Exception in worker', exc_info=True)
This helps me to control the state of the work queue -- I can see if there are any work items still being processed:
if executor._work_queue.unfinished_tasks:
# executor is still producing something
...
|
msg207893 - (view) |
Author: Brian Quinlan (bquinlan) * |
Date: 2014-01-11 00:43 |
Can't you accomplish what you want using add_done_callback?
e.g.
# Pseudocode
class MyExecutor(ThreadPoolExecutor):
def __init__(self):
self._count = 0
def _decrement(self):
with self._some_lock:
self._count -= 1
def submit(self, fn, *args, **kwargs):
f = super(self).submit(fn, *args, **kwargs)
with self._some_lock:
self._count += 1
f.add_done_callback(self._decrement)
@property
def num_pending_futures(self):
return self._count
|
msg207896 - (view) |
Author: Victor Varvariuc (Victor.Varvariuc) |
Date: 2014-01-11 06:16 |
Hi!
Looks like your pseudocode will work as a workaround instead of monkey-patching!
Still the my suggestion to add the line to code stays.
self._count should be always equal to the length of self._work_queue? If yes, why duplication. If no - which one to use, why duplication? Also there is an additional lock.
http://docs.python.org/3.3/library/queue.html#queue.Queue.task_done - there is a special method, why not using it?
Looks like you think that `work_queue.task_done()` should not be added. I don't understand why, but you decide what's better for Python.
Thank you for your time!
Victor
|
msg253895 - (view) |
Author: Alexander Mohr (thehesiod) * |
Date: 2015-11-02 06:58 |
adding support for internal queue size is critical to avoid chewing through all your memory when you have a LOT of tasks. I just hit this issue myself. If we could have a simple parameter to set the max queue size this would help tremendously!
|
msg274588 - (view) |
Author: Patrik Dufresne (Patrik Dufresne) |
Date: 2016-09-06 18:19 |
Any update on this subject ?
Also had to monkey patch the implementation to avoid consuming all the system memory.
|
msg280727 - (view) |
Author: Vinay Anantharaman (Vinay Anantharaman) |
Date: 2016-11-14 03:10 |
I did a code reading myself and I noticed that task_done is not called as well. Is there a reason?
|
msg305857 - (view) |
Author: Michael Hrivnak (mhrivnak) |
Date: 2017-11-08 15:47 |
My project also has a use case for this, very similar to the others described. Here's what we want:
with ThreadPoolExecutor(queue_size=500) as executor:
for item in parse_a_long_list_of_work(somefile.xml):
executor.submit(Job(item))
I do not want to parse the entire list of work items and load them into memory at once. It is preferable for the main thread running the above code to block on submit() when the queue size is above some threshold.
It's a classic case of the producer and consumer operating at different speeds. In the past, a Queue object has been the way to connect such a producer and consumer. The various Executor classes do not provide an easy way to consume from a provided Queue object, so giving them that capability would be a reasonable alternative to having the submit() method block.
|
msg314742 - (view) |
Author: Antoine Pitrou (pitrou) * |
Date: 2018-03-31 17:04 |
Issue 29595 has a more complete PR, so closing this issue as duplicate.
|
|
Date |
User |
Action |
Args |
2022-04-11 14:57:27 | admin | set | github: 58327 |
2018-03-31 17:04:32 | pitrou | set | status: open -> closed
superseder: Expose max_queue_size in ThreadPoolExecutor
nosy:
+ pitrou messages:
+ msg314742 resolution: duplicate stage: patch review -> resolved |
2017-11-08 15:47:47 | mhrivnak | set | nosy:
+ mhrivnak messages:
+ msg305857
|
2017-02-12 20:30:05 | Winterflower | set | nosy:
+ Winterflower
|
2016-11-14 03:10:51 | Vinay Anantharaman | set | nosy:
+ Vinay Anantharaman messages:
+ msg280727
|
2016-09-06 18:19:30 | Patrik Dufresne | set | nosy:
+ Patrik Dufresne messages:
+ msg274588
|
2015-11-02 06:58:55 | thehesiod | set | nosy:
+ thehesiod messages:
+ msg253895
|
2014-01-11 06:16:57 | Victor.Varvariuc | set | messages:
+ msg207896 |
2014-01-11 00:43:38 | bquinlan | set | messages:
+ msg207893 |
2014-01-08 05:53:45 | Victor.Varvariuc | set | messages:
+ msg207672 |
2014-01-07 21:06:53 | bquinlan | set | messages:
+ msg207602 |
2014-01-07 08:42:46 | Victor.Varvariuc | set | nosy:
+ Victor.Varvariuc messages:
+ msg207512
|
2012-06-13 02:50:39 | Nam.Nguyen | set | messages:
+ msg162700 |
2012-06-12 09:00:14 | bquinlan | set | messages:
+ msg162664 |
2012-06-11 04:46:07 | Nam.Nguyen | set | messages:
+ msg162606 |
2012-06-11 04:34:51 | bquinlan | set | messages:
+ msg162605 |
2012-06-11 03:56:35 | Nam.Nguyen | set | messages:
+ msg162604 |
2012-06-11 02:55:12 | bquinlan | set | messages:
+ msg162601 |
2012-06-11 00:50:37 | r.david.murray | set | versions:
+ Python 3.3 nosy:
+ r.david.murray
messages:
+ msg162599
stage: patch review |
2012-03-07 20:11:59 | bquinlan | set | assignee: bquinlan
nosy:
+ bquinlan |
2012-02-27 23:01:54 | Nam.Nguyen | set | files:
+ executor-queue-size.diff keywords:
+ patch |
2012-02-25 00:44:40 | Nam.Nguyen | set | messages:
+ msg154177 |
2012-02-25 00:19:01 | Nam.Nguyen | create | |