diff -r b116489d31ff Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/concurrent/futures/_base.py Tue Aug 26 21:56:38 2014 -0400 @@ -170,6 +170,26 @@ return waiter +class WorkItem(object): + """ Class representing an enqueued work item. + + These objects are returned by Executor.active_tasks() and + Executor.waiting_tasks(). + + """ + def __init__(self, wi, work_id=None): + self.fn = wi.fn + self.args = wi.args + self.kwargs = wi.kwargs + self._work_id = work_id + + def __str__(self): + return repr(self) + + def __repr__(self): + return "".format( + self.fn, self.args, self.kwargs) + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -570,6 +590,30 @@ """ pass + def worker_count(self): + raise NotImplementedError() + + def idle_worker_count(self): + raise NotImplementedError() + + def active_worker_count(self): + raise NotImplementedError() + + def task_count(self): + raise NotImplementedError() + + def active_task_count(self): + raise NotImplementedError() + + def waiting_task_count(self): + raise NotImplementedError() + + def active_tasks(self): + raise NotImplementedError() + + def waiting_tasks(self): + raise NotImplementedError() + def __enter__(self): return self diff -r b116489d31ff Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/concurrent/futures/process.py Tue Aug 26 21:56:38 2014 -0400 @@ -95,6 +95,10 @@ self.args = args self.kwargs = kwargs + def __repr__(self): + return "WorkItem: {} args: {} kwargs: {}".format( + self.fn, self.args, self.kwargs) + class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): self.work_id = work_id @@ -108,6 +112,10 @@ self.args = args self.kwargs = kwargs +class _WorkId(object): + def __init__(self, work_id): + self.work_id = work_id + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -127,6 +135,8 @@ # Wake up queue management thread result_queue.put(os.getpid()) return + + result_queue.put(_WorkId(call_item.work_id)) try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: @@ -259,6 +269,9 @@ if not processes: shutdown_worker() return + elif isinstance(result_item, _WorkId): + executor = executor_reference() + executor._active_work_items.add(result_item.work_id) elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -269,6 +282,8 @@ work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + executor = executor_reference() + executor._active_work_items.remove(result_item.work_id) # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -360,6 +375,7 @@ self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._active_work_items = set() def _start_queue_management_thread(self): # When the executor gets lost, the weakref callback will wake up @@ -390,6 +406,34 @@ p.start() self._processes[p.pid] = p + def worker_count(self): + return len(self._processes) + + def active_worker_count(self): + return self.active_task_count() + + def idle_worker_count(self): + return self.worker_count() - self.active_worker_count() + + def task_count(self): + return len(self._pending_work_items) + + def active_task_count(self): + return len(self._active_work_items) + + def waiting_task_count(self): + return self.task_count() - self.active_task_count() + + def active_tasks(self): + return {_base.WorkItem(self._pending_work_items[t]) + for t in self._active_work_items} + + def waiting_tasks(self): + l = [_base.WorkItem(v, k) for k, v in self._pending_work_items.items() + if k not in self._active_work_items] + l.sort(key=lambda x: x._work_id) + return l + def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: diff -r b116489d31ff Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/concurrent/futures/thread.py Tue Aug 26 21:56:38 2014 -0400 @@ -57,27 +57,38 @@ else: self.future.set_result(result) -def _worker(executor_reference, work_queue): - try: - while True: - work_item = work_queue.get(block=True) - if work_item is not None: - work_item.run() - # Delete references to object. See issue16284 - del work_item - continue - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - # Notice other workers - work_queue.put(None) - return - del executor - except BaseException: - _base.LOGGER.critical('Exception in worker', exc_info=True) + def __repr__(self): + return "WorkItem: {} args: {} kwargs: {}".format( + self.fn, self.args, self.kwargs) + +class Worker(threading.Thread): + def __init__(self, executor_reference, work_queue): + super().__init__() + self._executor_reference = executor_reference + self._work_queue = work_queue + self._work_item = None + + def run(self): + try: + while True: + self._work_item = self._work_queue.get(block=True) + if self._work_item is not None: + self._work_item.run() + # Delete references to object. See issue16284 + self._work_item = None + continue + executor = self._executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + self._work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): @@ -109,17 +120,44 @@ return f submit.__doc__ = _base.Executor.submit.__doc__ + def worker_count(self): + return len(self._threads) + + def active_worker_count(self): + return self.active_task_count() + + def idle_worker_count(self): + return self.worker_count() - self.active_worker_count() + + def task_count(self): + return self.active_task_count() + self.waiting_task_count() + + def active_task_count(self): + return sum(1 for t in self._threads if t._work_item) + + def waiting_task_count(self): + return self._work_queue.qsize() + + def active_tasks(self): + return set(_base.WorkItem(t._work_item) for t in self._threads + if t._work_item) + + def waiting_tasks(self): + active = self.active_tasks() + with self._work_queue.mutex: + return [_base.WorkItem(task) for task in self._work_queue.queue + if not task in active] + def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self, weakref_cb), - self._work_queue)) + # Create a new thread if we're not at the max, and we + # don't have enough idle threads to handle pending tasks. + if (len(self._threads) < self._max_workers and + self.idle_worker_count() < self._work_queue.qsize()): + t = Worker(weakref.ref(self, weakref_cb), self._work_queue) t.daemon = True t.start() self._threads.add(t) diff -r b116489d31ff Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/test/test_concurrent_futures.py Tue Aug 26 21:56:38 2014 -0400 @@ -13,6 +13,7 @@ import sys import threading +import multiprocessing import time import unittest import weakref @@ -52,6 +53,12 @@ print(msg) sys.stdout.flush() +def event_wait(e): + e.wait() + +def event_wait_param(e, a): + event_wait(e) + class MyObject(object): def my_method(self): @@ -89,11 +96,51 @@ class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def _prime_executor(self): + # Make sure that the executor is ready to do work before running the + # tests. This should reduce the probability of timeouts in the tests. + event = threading.Event() + futures = [self.executor.submit(event_wait, event) + for _ in range(self.worker_count)] + event.set() + for f in futures: + f.result() + class ProcessPoolMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor +class ThreadPoolWorkerManagementTest(ThreadPoolMixin, unittest.TestCase): + do_prime = False + def setUp(self): + super().setUp() + self.event = threading.Event() + + def tearDown(self): + self.event.set() + + def _prime_executor(self): + pass + + def test_starting(self): + # We should only start new workers if we don't have an idle + # thread available. + self.assertEqual(len(self.executor._threads), 0) + self.executor.submit(mul, 2, 2) + self.assertEqual(len(self.executor._threads), 1) + self.executor.submit(event_wait, self.event) + self.assertEqual(len(self.executor._threads), 1) + self.executor.submit(event_wait, self.event) + self.assertEqual(len(self.executor._threads), 2) + self.event.set() + time.sleep(.4) + self.executor.submit(mul, 2, 2) + self.executor.submit(mul, 2, 2) + self.assertEqual(len(self.executor._threads), 2) + + + class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() @@ -130,7 +177,6 @@ self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: t.join() @@ -189,6 +235,110 @@ for p in processes.values(): p.join() +class IntrospectionTests: + def tearDown(self): + self.event.set() + super().tearDown() + + def test_worker_count(self): + self.assertEqual(self.executor.worker_count(), self.worker_count) + out = self.executor.submit(event_wait, self.event) + time.sleep(.5) + self.assertEqual(self.executor.active_worker_count(), 1) + self.assertEqual(self.executor.idle_worker_count(), + self.worker_count - 1) + self.event.set() + + def test_worker_count_full(self): + for _ in range(self.worker_count + 2): + self.executor.submit(event_wait, self.event) + time.sleep(.5) + self.assertEqual(self.executor.active_worker_count(), self.worker_count) + self.assertEqual(self.executor.idle_worker_count(), 0) + self.event.set() + + def test_task_count(self): + self.assertEqual(self.executor.task_count(), 0) + out = self.executor.submit(event_wait, self.event) + time.sleep(.5) + self.assertEqual(self.executor.task_count(), 1) + self.assertEqual(self.executor.active_task_count(), 1) + self.assertEqual(self.executor.waiting_task_count(), 0) + self.event.set() + + def test_task_count_full(self): + for _ in range(self.worker_count + 2): + self.executor.submit(event_wait, self.event) + time.sleep(.5) + self.assertEqual(self.executor.task_count(), self.worker_count + 2) + self.assertEqual(self.executor.active_task_count(), self.worker_count) + self.assertEqual(self.executor.waiting_task_count(), 2) + self.event.set() + self.event.clear() + time.sleep(.5) + # Waiting tasks should become active. + self.assertEqual(self.executor.active_task_count(), 2) + self.event.set() + + def test_task_content(self): + param = 1 + out = self.executor.submit(event_wait_param, self.event, param) + time.sleep(.5) + tasks = self.executor.active_tasks() + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks.pop().args, (self.event, param)) + self.assertEqual(len(self.executor.waiting_tasks()), 0) + self.event.set() + + def test_task_contents_full(self): + for i in range(self.worker_count + 2): + self.executor.submit(event_wait_param, self.event, i) + time.sleep(.5) + active_tasks = self.executor.active_tasks() + waiting_tasks = self.executor.waiting_tasks() + active_args = {item.args for item in active_tasks} + waiting_args = [item.args for item in waiting_tasks] + + # Check lengths. + self.assertEqual(len(active_tasks), self.worker_count) + self.assertEqual(len(waiting_tasks), 2) + + # Make sure right args are there for active tasks + expected_args = {(self.event, i) for i in range(self.worker_count)} + self.assertEqual(active_args, expected_args) + + # Make sure right args in right order for waiting tasks. + expected_args = [(self.event, i) for i in range(self.worker_count, + self.worker_count+2)] + self.assertEqual(waiting_args, expected_args) + self.event.set() + self.event.clear() + time.sleep(.5) + # Waiting tasks should become active. + expected_args = set(expected_args) + active_tasks = self.executor.active_tasks() + waiting_tasks = self.executor.waiting_tasks() + active_args = {item.args for item in active_tasks} + self.assertEqual(active_args, expected_args) + self.assertEqual(waiting_tasks, []) + + +class ThreadPoolIntrospectionTests(IntrospectionTests, ThreadPoolMixin, + unittest.TestCase): + def setUp(self): + super().setUp() + self.event = threading.Event() + +class ProcessPoolIntrospectionTests(IntrospectionTests, ProcessPoolMixin, + unittest.TestCase): + def tearDown(self): + super().tearDown() + self.manager.shutdown() + + def setUp(self): + super().setUp() + self.manager = multiprocessing.Manager() + self.event = self.manager.Event() class WaitTests: