diff -ur cpython.orig/Lib/concurrent/futures/_base.py cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py --- cpython.orig/Lib/concurrent/futures/_base.py 2013-05-07 08:21:21.000000000 +0000 +++ cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py 2013-05-10 16:35:16.000000000 +0000 @@ -6,7 +6,10 @@ import collections import logging import threading -import time +try: + from time import monotonic as time +except ImportError: + from time import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -188,7 +191,7 @@ before the given timeout. """ if timeout is not None: - end_time = timeout + time.time() + end_time = timeout + time() with _AcquireFutures(fs): finished = set( @@ -204,7 +207,7 @@ if timeout is None: wait_timeout = None else: - wait_timeout = end_time - time.time() + wait_timeout = end_time - time() if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( @@ -390,11 +393,11 @@ elif self._state == FINISHED: return self.__get_result() - self._condition.wait(timeout) + gotit = self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() - elif self._state == FINISHED: + elif gotit: return self.__get_result() else: raise TimeoutError() @@ -423,11 +426,11 @@ elif self._state == FINISHED: return self._exception - self._condition.wait(timeout) + gotit = self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() - elif self._state == FINISHED: + elif gotit: return self._exception else: raise TimeoutError() @@ -499,6 +502,39 @@ self._condition.notify_all() self._invoke_callbacks() + +class DelayedFuture(Future): + """A future whose execution can be delayed, and periodic.""" + + def __init__(self, sched_time, period=0, delay=0): + super().__init__() + self._sched_time = sched_time + if period > 0: + # step > 0 => fixed rate + self._step = period + elif delay > 0: + # step < 0 => fixed delay + self._step = -delay + else: + # step == 0 => one-shot + self._step = 0 + + def is_periodic(self): + return self._step != 0 + + def get_sched_time(self): + return self._sched_time + + def rearm(self): + """Re-arm the future, and update its scheduled execution time.""" + with self._condition: + if self._step > 0: + self._sched_time += self._step + else: + self._sched_time = time() - self._step + self._state = PENDING + + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" @@ -532,7 +568,7 @@ Exception: If fn(*args) raises for any values. """ if timeout is not None: - end_time = timeout + time.time() + end_time = timeout + time() fs = [self.submit(fn, *args) for args in zip(*iterables)] @@ -544,7 +580,7 @@ if timeout is None: yield future.result() else: - yield future.result(end_time - time.time()) + yield future.result(end_time - time()) finally: for future in fs: future.cancel() @@ -569,3 +605,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False + + +class ScheduledExecutor(Executor): + + def schedule(self, delay, fn, *args, **kwargs): + raise NotImplementedError() + + def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs): + raise NotImplementedError() + + def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs): + raise NotImplementedError() diff -ur cpython.orig/Lib/concurrent/futures/__init__.py cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py --- cpython.orig/Lib/concurrent/futures/__init__.py 2013-05-07 08:21:21.000000000 +0000 +++ cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py 2013-05-10 15:29:04.000000000 +0000 @@ -15,4 +15,4 @@ wait, as_completed) from concurrent.futures.process import ProcessPoolExecutor -from concurrent.futures.thread import ThreadPoolExecutor +from concurrent.futures.thread import ScheduledThreadPoolExecutor, ThreadPoolExecutor diff -ur cpython.orig/Lib/concurrent/futures/thread.py cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py --- cpython.orig/Lib/concurrent/futures/thread.py 2013-05-07 08:21:21.000000000 +0000 +++ cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py 2013-05-10 16:37:16.000000000 +0000 @@ -9,6 +9,10 @@ from concurrent.futures import _base import queue import threading +try: + from time import monotonic as time +except ImportError: + from time import time import weakref # Workers are created as daemon threads. This is done to allow the interpreter @@ -57,6 +61,39 @@ else: self.future.set_result(result) +class _DelayedWorkItem(_WorkItem): + + def __init__(self, queue, future, fn, args, kwargs): + super().__init__(future, fn, args, kwargs) + self.queue = queue + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) + else: + self.future.set_result(result) + if self.future.is_periodic(): + # rearm the future - it also updates it scheduled time + self.future.rearm() + # and re-schedule ourselves - XXX don't reschedule if the pool + # is shut down + self.queue.put(self) + + +class _DelayedWorkItemQueue(queue.SchedQueue): + + def put(self, w, block=True, timeout=None): + if w is not None: + super().put_abs(w.future.get_sched_time(), w, block, timeout) + else: + super().put_abs(0, None, block, timeout) + + def _worker(executor_reference, work_queue): try: while True: @@ -130,3 +167,31 @@ for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +class ScheduledThreadPoolExecutor(ThreadPoolExecutor): + def __init__(self, max_workers): + super().__init__(max_workers) + self._work_queue = _DelayedWorkItemQueue() + + def schedule(self, init_delay, fn, *args, **kwargs): + f = _base.DelayedFuture(time() + init_delay) + return self._schedule(f, fn, *args, **kwargs) + + def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs): + f = _base.DelayedFuture(time() + init_delay, period=period) + return self._schedule(f, fn, *args, **kwargs) + + def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs): + f = _base.DelayedFuture(time() + init_delay, delay=delay) + return self._schedule(f, fn, *args, **kwargs) + + def _schedule(self, f, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + w = _DelayedWorkItem(self._work_queue, f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f --- cpython.orig/Lib/queue.py 2013-05-07 08:21:21.000000000 +0000 +++ cpython-b3e1be1493a5/Lib/queue.py 2013-05-10 16:32:50.000000000 +0000 @@ -247,3 +247,71 @@ def _get(self): return self.queue.pop() + + +class SchedQueue(PriorityQueue): + '''Variant of Queue that retrieves open entries as their deadline expire. + ''' + + def put(self, delay, item, block=True, timeout=None): + self.put_abs(time() + delay, item, block, timeout) + + def put_abs(self, deadline, item, block=True, timeout=None): + super().put((deadline, item), block, timeout) + + def _put(self, data): + deadline, item = data + + do_notify = False + if self.queue: + earliest_deadline, _ = self.queue[0] + if deadline < earliest_deadline: + do_notify = True + else: + do_notify = True + + heappush(self.queue, (deadline, item)) + + if do_notify: + self.not_empty.notify_all() + + def get(self, block=True, timeout=None): + with self.not_empty: + if not self.queue and not block: + raise Empty + if timeout is not None: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + timeout_deadline = time() + timeout + + while True: + if not self.queue: + if timeout is None: + self.not_empty.wait() + else: + delay = timeout_deadline - time() + if delay > 0: + self.not_empty.wait(delay) + else: + raise Empty + else: + deadline, item = self.queue[0] + now = time() + + if now >= deadline: + heappop(self.queue) + if self.queue: + self.not_empty.notify_all() + return item + elif not block: + raise Empty + elif timeout is None: + self.not_empty.wait(deadline - now) + else: + deadline = min(deadline, timeout_deadline) + delay = deadline - now + if delay > 0: + self.not_empty.wait(delay) + else: + raise Empty