Index: Doc/library/asyncore.rst =================================================================== --- Doc/library/asyncore.rst (revision 70129) +++ Doc/library/asyncore.rst (working copy) @@ -45,7 +45,7 @@ any that have been added to the map during asynchronous service) is closed. -.. function:: loop([timeout[, use_poll[, map[,count]]]]) +.. function:: loop([timeout[, use_poll[, map[,count[, tasks]]]]]) Enter a polling loop that terminates after count passes or all open channels have been closed. All arguments are optional. The *count* @@ -62,7 +62,13 @@ :class:`asyncore.dispatcher`, :class:`asynchat.async_chat` and subclasses thereof) can freely be mixed in the map. + The *tasks* parameter, defaulting to None, can be used to specify an + alternate list for storing the delayed call objects. + .. versionchanged:: 2.7 + *tasks* was added. + + .. class:: dispatcher() The :class:`dispatcher` class is a thin wrapper around a low-level socket @@ -222,6 +228,31 @@ flushed). Sockets are automatically closed when they are garbage-collected. + +.. class:: call_later(seconds, target[, *args[, **kwargs]]) + Calls a function at a later time. It can be used to asynchronously schedule + a call within the polling loop without blocking it. The instance returned + is an object that can be used to cancel or reschedule the call. + + .. versionchanged:: 2.7 + Added in 2.7. + + .. data:: cancelled + + Whether the call has been cancelled. + + .. method:: reset() + + Reschedule the call resetting the current countdown. + + .. method:: delay() + + Reschdule the call for a later time. + + .. method:: cancel() + + Unschedule the call. + .. class:: file_dispatcher() A file_dispatcher takes a file descriptor or file object along with an Index: Lib/asyncore.py =================================================================== --- Lib/asyncore.py (revision 70129) +++ Lib/asyncore.py (working copy) @@ -50,7 +50,7 @@ import socket import sys import time - +import heapq import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode @@ -60,6 +60,11 @@ except NameError: socket_map = {} +try: + scheduled_tasks +except NameError: + scheduled_tasks = [] + def _strerror(err): res = os.strerror(err) if res == 'Unknown error': @@ -186,9 +191,29 @@ poll3 = poll2 # Alias for backward compatibility -def loop(timeout=30.0, use_poll=False, map=None, count=None): + +def scheduler(tasks=None): + if tasks is None: + tasks = scheduled_tasks + now = time.time() + while tasks and now >= tasks[0].timeout: + call = heapq.heappop(tasks) + if call.repush: + heapq.heappush(tasks, call) + call.repush = False + continue + try: + call.call() + finally: + if not call.cancelled: + call.cancel() + + +def loop(timeout=1.0, use_poll=False, map=None, count=None, tasks=None): if map is None: map = socket_map + if tasks is None: + tasks = scheduled_tasks if use_poll and hasattr(select, 'poll'): poll_fun = poll2 @@ -196,14 +221,96 @@ poll_fun = poll if count is None: - while map: - poll_fun(timeout, map) - + while map or tasks: + if map: + poll_fun(timeout, map) + if tasks: + scheduler() else: - while map and count > 0: - poll_fun(timeout, map) + while (map or tasks) and count > 0: + if map: + poll_fun(timeout, map) + if tasks: + scheduler() count = count - 1 + +class call_later: + """Calls a function at a later time. + + It can be used to asynchronously schedule a call within the polling + loop without blocking it. The instance returned is an object that + can be used to cancel or reschedule the call. + """ + + def __init__(self, seconds, target, *args, **kwargs): + """ + - seconds: the number of seconds to wait + - target: the callable object to call later + - args: the arguments to call it with + - kwargs: the keyword arguments to call it with + - _tasks: a reserved keyword to specify a different list to + store the delayed call instances. + """ + assert callable(target), "%s is not callable" %target + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" % (seconds) + self.__delay = seconds + self.__target = target + self.__args = args + self.__kwargs = kwargs + self.__tasks = kwargs.pop('_tasks', scheduled_tasks) + # seconds from the epoch at which to call the function + self.timeout = time.time() + self.__delay + self.repush = False + self.cancelled = False + heapq.heappush(self.__tasks, self) + + def __lt__(self, other): + return self.timeout <= other.timeout + + def call(self): + """Call this scheduled function.""" + assert not self.cancelled, "Already cancelled" + self.__target(*self.__args, **self.__kwargs) + + def reset(self): + """Reschedule this call resetting the current countdown.""" + assert not self.cancelled, "Already cancelled" + self.timeout = time.time() + self.__delay + self.repush = True + + def delay(self, seconds): + """Reschedule this call for a later time.""" + assert not self.cancelled, "Already cancelled." + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" %(seconds) + self.__delay = seconds + newtime = time.time() + self.__delay + if newtime > self.timeout: + self.timeout = newtime + self.repush = True + else: + # XXX - slow, can be improved + self.timeout = newtime + heapq.heapify(self.__tasks) + + def cancel(self): + """Unschedule this call.""" + assert not self.cancelled, "Already cancelled" + self.cancelled = True + del self.__target, self.__args, self.__kwargs + if self in self.__tasks: + pos = self.__tasks.index(self) + if pos == 0: + heapq.heappop(self.__tasks) + elif pos == len(self.__tasks) - 1: + self.__tasks.pop(pos) + else: + self.__tasks[pos] = self.__tasks.pop() + heapq._siftup(self.__tasks, pos) + + class dispatcher: debug = False @@ -542,9 +649,12 @@ info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None, ignore_all=False): +def close_all(map=None, tasks=None, ignore_all=False): if map is None: map = socket_map + if tasks is None: + tasks = scheduled_tasks + for x in map.values(): try: x.close() @@ -560,6 +670,16 @@ raise map.clear() + for x in tasks: + try: + x.cancel() + except (ExitNow, KeyboardInterrupt, SystemExit): + raise + except: + if not ignore_all: + raise + del tasks[:] + # Asynchronous File I/O: # # After a little research (reading man pages on various unixen, and Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revision 70129) +++ Lib/test/test_asyncore.py (working copy) @@ -411,9 +411,80 @@ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) +class CallLaterTests(unittest.TestCase): + """Tests for CallLater class.""" + + def setUp(self): + # remove any unfired scheduled call left behind + asyncore.close_all() + + def scheduler(self, timeout=0.01, count=100): + while asyncore.scheduled_tasks and count > 0: + asyncore.scheduler() + count -= 1 + time.sleep(timeout) + + def test_interface(self): + fun = lambda: 0 + self.assertRaises(AssertionError, asyncore.call_later, -1, fun) + x = asyncore.call_later(3, fun) + self.assertRaises(AssertionError, x.delay, -1) + self.assert_(x.cancelled is False) + x.cancel() + self.assert_(x.cancelled is True) + self.assertRaises(AssertionError, x.call) + self.assertRaises(AssertionError, x.reset) + self.assertRaises(AssertionError, x.delay, 2) + self.assertRaises(AssertionError, x.cancel) + + def test_order(self): + l = [] + fun = lambda x: l.append(x) + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + asyncore.call_later(x, fun, x) + self.scheduler() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + + def test_delay(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01).delay(0.07) + asyncore.call_later(0.02, fun, 0.02).delay(0.08) + asyncore.call_later(0.03, fun, 0.03) + asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05) + asyncore.call_later(0.06, fun, 0.06).delay(0.001) + self.scheduler() + self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) + + def test_reset(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01) + asyncore.call_later(0.02, fun, 0.02) + asyncore.call_later(0.03, fun, 0.03) + x = asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05) + time.sleep(0.1) + x.reset() + self.scheduler() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) + + def test_cancel(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.01, fun, 0.01).cancel() + asyncore.call_later(0.02, fun, 0.02) + asyncore.call_later(0.03, fun, 0.03) + asyncore.call_later(0.04, fun, 0.04) + asyncore.call_later(0.05, fun, 0.05).cancel() + self.scheduler() + self.assertEqual(l, [0.02, 0.03, 0.04]) + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, CallLaterTests] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest)