diff -r 4b3238923b01 Lib/concurrent/futures/__init__.py --- a/Lib/concurrent/futures/__init__.py Fri May 10 19:57:44 2013 -0700 +++ b/Lib/concurrent/futures/__init__.py Sat May 11 20:31:56 2013 +0200 @@ -15,4 +15,5 @@ wait, as_completed) from concurrent.futures.process import ProcessPoolExecutor -from concurrent.futures.thread import ThreadPoolExecutor +from concurrent.futures.thread import (ScheduledThreadPoolExecutor, + ThreadPoolExecutor) diff -r 4b3238923b01 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Fri May 10 19:57:44 2013 -0700 +++ b/Lib/concurrent/futures/_base.py Sat May 11 20:31:56 2013 +0200 @@ -4,6 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import functools import logging import threading import time @@ -499,6 +500,87 @@ self._condition.notify_all() self._invoke_callbacks() + +@functools.total_ordering +class DelayedFuture(Future): + """A future whose execution can be delayed, one-shot or periodic. + + DelayedFutures support mutual ordering based on their scheduled execution + time, which makes them easy to use in e.g. priority queues. + """ + + def __init__(self, init_time, period=0, delay=0): + """Initializes the future. Should not be called by clients. + + Args: + init_time: The initial absolute execution time, in seconds since + epoch. + period: The execution period, in seconds. + delay: The execution delay, in seconds. + + If a period is given, then the task will be scheduled every `period` + seconds: each execution will start `period` seconds after the starting + time of the previous execution: there will be no drift incurred by the + amount of time the task takes to execute. + Conversely, if a delay is given, then there will always be `delay` + seconds between sequential executions: each execution will start + `delay` seconds after the ending time of the previous execution: there + will be a drift incurred by the amount of time the task takes to + execute. + If neither a period nor a delay are specified, then the execution will + occur exactly once (one-shot). + + The future's result can only be retrieved for one-shot execution, + since there's no meaningful result for periodic execution (for those, + calling result()/exception() will block until the future is cancelled). + """ + super().__init__() + self._sched_time = init_time + if period > 0: + # step > 0 => fixed rate + self._step = period + elif delay > 0: + # step < 0 => fixed delay + self._step = -delay + else: + # step == 0 => one-shot + self._step = 0 + + def is_periodic(self): + """Return True if the future execution is periodic.""" + return self._step != 0 + + def get_delay(self): + """Return the delay until the next scheduled execution, in seconds.""" + with self._condition: + return self._sched_time - time.time() + + def _get_sched_time(self): + with self._condition: + return self._sched_time + + def rearm(self): + """Re-arm a periodic future, preparing it for its next execution. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._state = PENDING + if self._step > 0: + self._sched_time += self._step + else: + self._sched_time = time.time() - self._step + + def __eq__(self, other): + return self is other + + def __lt__(self, other): + return self._get_sched_time() < other._get_sched_time() + + def __hash__(self): + return id(self) + + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" @@ -569,3 +651,48 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False + + +class ScheduledExecutor(Executor): + """This is an abstract base class for concrete asynchronous scheduled + executors.""" + + def schedule(self, delay, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments after the + given delay. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a DelayedFuture instance representing the execution of the callable. + + Returns: + A DelayedFuture representing the given call. + """ + raise NotImplementedError() + + def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments after the + given delay, and then repeatedly at the given period. In other words, + each execution will start `period` seconds after the starting time of + the previous execution. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a DelayedFuture instance representing the execution of the callable. + + Returns: + A DelayedFuture representing the given call. + """ + raise NotImplementedError() + + def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments after the + given delay, and then repeatedly with the given delay between two + consecutive executions. In other words, each execution will start + `delay` seconds after the ending time of the previous execution. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a DelayedFuture instance representing the execution of the callable. + + Returns: + A DelayedFuture representing the given call. + """ + raise NotImplementedError() diff -r 4b3238923b01 Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Fri May 10 19:57:44 2013 -0700 +++ b/Lib/concurrent/futures/thread.py Sat May 11 20:31:56 2013 +0200 @@ -7,8 +7,12 @@ import atexit from concurrent.futures import _base +import functools +import heapq import queue +import sys import threading +import time import weakref # Workers are created as daemon threads. This is done to allow the interpreter @@ -33,7 +37,7 @@ _shutdown = True items = list(_threads_queues.items()) for t, q in items: - q.put(None) + q.put(q.sentinel) for t, q in items: t.join() @@ -57,11 +61,110 @@ else: self.future.set_result(result) + +class _WorkItemQueue(queue.Queue): + + sentinel = None + + +@functools.total_ordering +class _DelayedWorkItem(_WorkItem): + """A wrapper used for the execution of a DelayedFuture. + + It defers to the wrapped DelayedFuture for its ordering. + """ + + def __init__(self, pool, future, fn, args, kwargs): + super().__init__(future, fn, args, kwargs) + self._pool = pool + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) + else: + if self.future.is_periodic(): + # rearm the future + self.future.rearm() + # and re-schedule ourselves + try: + self._pool._schedule(self) + except RuntimeError: + # pool shut down + pass + else: + # we only set the result in case of one-shot + self.future.set_result(result) + + def get_delay(self): + return self.future.get_delay() + + def __eq__(self, other): + return self is other + + def __lt__(self, other): + return self.future < other.future + + def __hash__(self): + return id(self) + + +class _DelayedWorkItemQueue: + """A wrapper around a priority queue, holding _DelayedWorkItem ordered by + their wrapped DelayedFuture's natural ordering (scheduled execution time). + """ + + # the sentinel deadline must appear later than any other work deadline to + # let pending work finish (hence the sys.maxsize execution time), but it + # must be returned immediately when it's the head of the queue, see the + # get() method below + sentinel = _DelayedWorkItem(None, _base.DelayedFuture(sys.maxsize), + None, None, None) + + def __init__(self): + self._queue = [] + self._available = threading.Condition() + + def put(self, worker): + with self._available: + if self._queue: + first = self._queue[0] + else: + first = None + + heapq.heappush(self._queue, worker) + + if first is None or worker < first: + self._available.notify_all() + + def get(self, block=True): + with self._available: + while True: + if not self._queue: + self._available.wait() + else: + first = self._queue[0] + + delay = first.get_delay() + + if delay <= 0 or first is self.sentinel: + heapq.heappop(self._queue) + if self._queue: + self._available.notify_all() + return first + else: + self._available.wait(delay) + + def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) - if work_item is not None: + if work_item is not work_queue.sentinel: work_item.run() # Delete references to object. See issue16284 del work_item @@ -73,7 +176,7 @@ # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers - work_queue.put(None) + work_queue.put(work_queue.sentinel) return del executor except BaseException: @@ -88,7 +191,7 @@ execute the given calls. """ self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = _WorkItemQueue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() @@ -110,7 +213,7 @@ # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): - q.put(None) + q.put(q.sentinel) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: @@ -125,8 +228,54 @@ def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True - self._work_queue.put(None) + self._work_queue.put(self._work_queue.sentinel) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__ + + +class ScheduledThreadPoolExecutor(ThreadPoolExecutor): + + def __init__(self, max_workers): + """Initializes a new ScheduledThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + """ + super().__init__(max_workers) + self._work_queue = _DelayedWorkItemQueue() + + def submit(self, fn, *args, **kwargs): + return self.schedule(0, fn, *args, **kwargs) + submit.__doc__ = _base.Executor.submit.__doc__ + + def schedule(self, init_delay, fn, *args, **kwargs): + f = _base.DelayedFuture(time.time() + init_delay) + w = _DelayedWorkItem(self, f, fn, args, kwargs) + self._schedule(w) + return f + schedule.__doc__ = _base.ScheduledExecutor.schedule.__doc__ + + def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs): + f = _base.DelayedFuture(time.time() + init_delay, period=period) + w = _DelayedWorkItem(self, f, fn, args, kwargs) + self._schedule(w) + return f + schedule_fixed_rate.__doc__ = _base.ScheduledExecutor.schedule_fixed_rate.__doc__ + + def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs): + f = _base.DelayedFuture(time.time() + init_delay, delay=delay) + w = _DelayedWorkItem(self, f, fn, args, kwargs) + self._schedule(w) + return f + schedule_fixed_delay.__doc__ = _base.ScheduledExecutor.schedule_fixed_delay.__doc__ + + def _schedule(self, w): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + self._work_queue.put(w) + self._adjust_thread_count() diff -r 4b3238923b01 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Fri May 10 19:57:44 2013 -0700 +++ b/Lib/test/test_concurrent_futures.py Sat May 11 20:31:56 2013 +0200 @@ -11,6 +11,7 @@ from test.script_helper import assert_python_ok +from collections import defaultdict import sys import threading import time @@ -58,6 +59,34 @@ pass +class RefAccumulator: + + """An helper class to record events.""" + + def __init__(self): + self._result = defaultdict(list) + self._lock = threading.Lock() + + def add(self, key, delay): + with self._lock: + self._result[key].append(delay) + + def result(self): + with self._lock: + return self._result + + +class TimedAccumulator(RefAccumulator): + + def __init__(self): + super().__init__() + self._init_time = time.time() + + def add(self, key): + with self._lock: + self._result[key].append(time.time() - self._init_time) + + class ExecutorMixin: worker_count = 5 @@ -85,11 +114,26 @@ for f in futures: f.result() + def assertDelay(self, actual, expected): + # The waiting and/or time.time() can be imprecise, which + # is why comparing to the expected value would sometimes fail + # (especially under Windows). + # account for 0 timeout + expected = max(expected, 0.01) + actual = max(actual, 0.01) + self.assertGreaterEqual(actual, expected * 0.6) + # Test nothing insane happened + self.assertLess(actual, expected * 10.0) + class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor +class ScheduledThreadPoolMixin(ExecutorMixin): + executor_type = futures.ScheduledThreadPoolExecutor + + class ProcessPoolMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -122,10 +166,7 @@ f.result() -class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): - def _prime_executor(self): - pass - +class BaseThreadPoolShutdownTest(ExecutorShutdownTest): def test_threads_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) @@ -153,6 +194,17 @@ for t in threads: t.join() +class ThreadPoolShutdownTest(ThreadPoolMixin, BaseThreadPoolShutdownTest): + def _prime_executor(self): + pass + + +class ScheduledThreadPoolShutdownTest(ScheduledThreadPoolMixin, + BaseThreadPoolShutdownTest): + def _prime_executor(self): + pass + + class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def _prime_executor(self): @@ -291,7 +343,7 @@ self.assertEqual(set([future2]), pending) -class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): +class BaseThreadPoolWaitTests(WaitTests): def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all @@ -309,6 +361,12 @@ sys.setswitchinterval(oldswitchinterval) +class ThreadPoolWaitTests(ThreadPoolMixin, BaseThreadPoolWaitTests): + pass + +class ScheduledThreadPoolWaitTests(ScheduledThreadPoolMixin, BaseThreadPoolWaitTests): + pass + class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): pass @@ -355,6 +413,10 @@ pass +class ScheduledThreadPoolAsCompletedTests(ScheduledThreadPoolMixin, + AsCompletedTests): + pass + class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): pass @@ -445,6 +507,143 @@ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) +class ScheduledThreadPoolExecutorTest(ScheduledThreadPoolMixin, + unittest.TestCase): + + # some of the tests below create many events, bump the number of workers to + # keep up + worker_count = 10 + + def compare_accumulators(self, actual, expected, strict=True): + expected = expected.result() + actual = actual.result() + + self.assertEqual(actual.keys(), expected.keys()) + for key in actual: + expected_delays = sorted(expected[key]) + actual_delays = sorted(actual[key]) + if strict: + # in strict mode, the number of events must match exactly for + # each key + tolerance = 0 + else: + # otherwise, we tolerate an offset, due to timing and rounding + # errors: this is useful for timing-dependent tests + tolerance = 1 + + self.assertLessEqual(abs(len(actual_delays) - len(expected_delays)), + tolerance) + for actual_delay, expected_delay in zip(actual_delays, + expected_delays): + self.assertDelay(actual_delay, expected_delay) + + def test_schedule(self): + acc = TimedAccumulator() + expected = RefAccumulator() + NB = 100 + + for i in range(NB): + self.executor.schedule(i/NB, acc.add, i) + expected.add(i, i/NB) + + self.executor.shutdown(wait=True) + self.compare_accumulators(acc, expected) + + def test_schedule_result(self): + expected = {} + NB = 100 + + for i in range(NB): + f = self.executor.schedule(i/NB, lambda i: i, i) + expected[f] = i + + self.executor.shutdown(wait=True) + self.assertEqual(expected, dict((f, f.result()) for f in expected)) + + def test_schedule_fixed_rate(self): + acc = TimedAccumulator() + expected = RefAccumulator() + NB = 10 + RUN_TIME = 2 + + for i in range(1, NB+1): + self.executor.schedule_fixed_rate(0, RUN_TIME * i/NB, acc.add, i) + # let's compute future dealines during the RUN_TIME period + for n in range(int(NB/i) + 2): + expected.add(i, n*RUN_TIME*i/NB) + + time.sleep(RUN_TIME) + + self.executor.shutdown(wait=True) + self.compare_accumulators(acc, expected, False) + + def test_schedule_fixed_delay(self): + acc = TimedAccumulator() + expected = RefAccumulator() + NB = 10 + RUN_TIME = 2 + + for i in range(1, NB+1): + self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, acc.add, i) + # let's compute future dealines during the RUN_TIME period + for n in range(int(NB/i)+2): + expected.add(i, n*RUN_TIME*i/NB) + + time.sleep(RUN_TIME) + + self.executor.shutdown(wait=True) + self.compare_accumulators(acc, expected, False) + + def test_schedule_fixed_delay_drift(self): + acc = TimedAccumulator() + expected = RefAccumulator() + NB = 10 + RUN_TIME = 2 + DRIFT = 0.1 + + def delayed_add(i): + time.sleep(DRIFT) + acc.add(i) + + for i in range(1, NB+1): + self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, delayed_add, i) + # let's compute future dealines during the RUN_TIME period, taking + # DRIFT drit into account + for nb in range(int(RUN_TIME/(RUN_TIME*i/NB + DRIFT))+2): + expected.add(i, nb*(RUN_TIME*i/NB+DRIFT)+DRIFT) + + time.sleep(RUN_TIME) + + self.executor.shutdown(wait=True) + self.compare_accumulators(acc, expected, False) + + def test_cancel(self): + acc = TimedAccumulator() + expected = RefAccumulator() + NB = 100 + futures = [] + RUN_TIME = 5 + + for _ in range(0, NB, 3): + f = self.executor.schedule(0.1, acc.add, 1) + futures.append(f) + expected.add(1, 0.1) + f = self.executor.schedule_fixed_rate(0.2, RUN_TIME, acc.add, 2) + futures.append(f) + expected.add(2, 0.2) + f = self.executor.schedule_fixed_delay(0.3, RUN_TIME, acc.add, 3) + futures.append(f) + expected.add(3, 0.3) + + time.sleep(RUN_TIME) + + for f in futures: + f.cancel() + + self.executor.shutdown(wait=True) + self.compare_accumulators(acc, expected, False) + + class FutureTests(unittest.TestCase): def test_done_callback_with_result(self): callback_result = None @@ -674,11 +873,15 @@ ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, + ScheduledThreadPoolWaitTests, ProcessPoolAsCompletedTests, ThreadPoolAsCompletedTests, + ScheduledThreadPoolAsCompletedTests, FutureTests, ProcessPoolShutdownTest, ThreadPoolShutdownTest, + ScheduledThreadPoolShutdownTest, + ScheduledThreadPoolExecutorTest, ) finally: test.support.reap_children()