Index: Doc/library/asyncore.rst =================================================================== --- Doc/library/asyncore.rst (revision 66459) +++ Doc/library/asyncore.rst (working copy) @@ -222,6 +222,31 @@ flushed). Sockets are automatically closed when they are garbage-collected. + +.. class:: call_later(seconds, target[, *args[, **kwargs]]) + Calls a function at a later time. It can be used to asynchronously schedule + a call within the polling loop without blocking it. The instance returned + is an object that can be used to cancel or reschedule the call. + + .. versionchanged:: 2.7 + Added in 2.7. + + .. data:: cancelled + + Whether the call has been cancelled. + + .. method:: reset() + + Reschedule the call resetting the current countdown. + + .. method:: delay() + + Reschdule the call for a later time. + + .. method:: cancel() + + Unschedule the call. + .. class:: file_dispatcher() A file_dispatcher takes a file descriptor or file object along with an Index: Lib/asyncore.py =================================================================== --- Lib/asyncore.py (revision 66459) +++ Lib/asyncore.py (working copy) @@ -50,7 +50,7 @@ import socket import sys import time - +import heapq import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode @@ -60,6 +60,8 @@ except NameError: socket_map = {} +tasks = [] + def _strerror(err): res = os.strerror(err) if res == 'Unknown error': @@ -186,7 +188,23 @@ poll3 = poll2 # Alias for backward compatibility -def loop(timeout=30.0, use_poll=False, map=None, count=None): + +def scheduler(): + now = time.time() + while tasks and now >= tasks[0].timeout: + call = heapq.heappop(tasks) + if call.repush: + heapq.heappush(tasks, call) + call.repush = False + continue + try: + call.call() + finally: + if not call.cancelled: + call.cancel() + + +def loop(timeout=1.0, use_poll=False, map=None, count=None): if map is None: map = socket_map @@ -196,14 +214,93 @@ poll_fun = poll if count is None: - while map: - poll_fun(timeout, map) - + while map or tasks: + if map: + poll_fun(timeout, map) + if tasks: + scheduler() else: - while map and count > 0: - poll_fun(timeout, map) + while (map or tasks) and count > 0: + if map: + poll_fun(timeout, map) + if tasks: + scheduler() count = count - 1 + +class call_later: + """Calls a function at a later time. + + It can be used to asynchronously schedule a call within the polling + loop without blocking it. The instance returned is an object that + can be used to cancel or reschedule the call. + """ + + def __init__(self, seconds, target, *args, **kwargs): + """ + - seconds: the number of seconds to wait + - target: the callable object to call later + - args: the arguments to call it with + - kwargs: the keyword arguments to call it with + """ + assert callable(target), "%s is not callable" %target + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" % (seconds) + self.__delay = seconds + self.__target = target + self.__args = args + self.__kwargs = kwargs + # seconds from the epoch at which to call the function + self.timeout = time.time() + self.__delay + self.repush = False + self.cancelled = False + heapq.heappush(tasks, self) + + def __le__(self, other): + return self.timeout <= other.timeout + + def call(self): + """Call this scheduled function.""" + assert not self.cancelled, "Already cancelled" + self.__target(*self.__args, **self.__kwargs) + + def reset(self): + """Reschedule this call resetting the current countdown.""" + assert not self.cancelled, "Already cancelled" + self.timeout = time.time() + self.__delay + self.repush = True + + def delay(self, seconds): + """Reschedule this call for a later time.""" + assert not self.cancelled, "Already cancelled." + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" %(seconds) + self.__delay = seconds + newtime = time.time() + self.__delay + if newtime > self.timeout: + self.timeout = newtime + self.repush = True + else: + # XXX - slow, can be improved + self.timeout = newtime + heapq.heapify(tasks) + + def cancel(self): + """Unschedule this call.""" + assert not self.cancelled, "Already cancelled" + self.cancelled = True + del self.__target, self.__args, self.__kwargs + if self in tasks: + pos = tasks.index(self) + if pos == 0: + heapq.heappop(tasks) + elif pos == len(tasks) - 1: + tasks.pop(pos) + else: + tasks[pos] = tasks.pop() + heapq._siftup(tasks, pos) + + class dispatcher: debug = False Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revision 66459) +++ Lib/test/test_asyncore.py (working copy) @@ -411,9 +411,82 @@ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) +class CallLaterTests(unittest.TestCase): + """Tests for CallLater class.""" + + def setUp(self): + for task in asyncore.tasks: + if not task.cancelled: + task.cancel() + del asyncore.tasks[:] + + def scheduler(self, timeout=0.01, count=100): + while asyncore.tasks and count > 0: + asyncore.scheduler() + count -= 1 + time.sleep(timeout) + + def test_interface(self): + fun = lambda: 0 + self.assertRaises(AssertionError, asyncore.call_later, -1, fun) + x = asyncore.call_later(3, fun) + self.assertRaises(AssertionError, x.delay, -1) + self.assert_(x.cancelled is False) + x.cancel() + self.assert_(x.cancelled is True) + self.assertRaises(AssertionError, x.call) + self.assertRaises(AssertionError, x.reset) + self.assertRaises(AssertionError, x.delay, 2) + self.assertRaises(AssertionError, x.cancel) + + def test_order(self): + l = [] + fun = lambda x: l.append(x) + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + asyncore.call_later(x, fun, x) + self.scheduler() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + + def test_delay(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01).delay(0.07) + asyncore.call_later(0.02, fun, 0.02).delay(0.08) + asyncore.call_later(0.03, fun, 0.03) + asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05) + asyncore.call_later(0.06, fun, 0.06).delay(0.001) + self.scheduler() + self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) + + def test_reset(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01) + asyncore.call_later(0.02, fun, 0.02) + asyncore.call_later(0.03, fun, 0.03) + x = asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05) + time.sleep(0.1) + x.reset() + self.scheduler() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) + + def test_cancel(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01).cancel() + asyncore.call_later(0.02, fun, 0.02) + asyncore.call_later(0.03, fun, 0.03) + asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05).cancel() + self.scheduler() + self.assertEqual(l, [0.02, 0.03, 0.04]) + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, CallLaterTests] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest)