Index: E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/asyncore.py =================================================================== --- E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/asyncore.py (revision 59499) +++ E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/asyncore.py (working copy) @@ -50,6 +50,7 @@ import socket import sys import time +import heapq import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ @@ -60,6 +61,9 @@ except NameError: socket_map = {} +delayed_map = {} +heap_scheduler = [] + class ExitNow(Exception): pass @@ -99,6 +103,20 @@ raise except: obj.handle_error() + +gettime = time.time + +def scheduler(): + now = gettime() + while heap_scheduler and now >= heap_scheduler[0]: + delayed = delayed_map[heap_scheduler[0]] + try: + delayed.call() + except: + delayed.dispatcher.handle_error() + finally: + if not delayed.cancelled: + delayed.cancel() def poll(timeout=0.0, map=None): if map is None: @@ -143,6 +161,9 @@ continue _exception(obj) + scheduler() + + def poll2(timeout=0.0, map=None): # Use the poll() support added to the select module in Python 2.0 if map is None: @@ -175,9 +196,12 @@ continue readwrite(obj, flags) + scheduler() + + poll3 = poll2 # Alias for backward compatibility -def loop(timeout=30.0, use_poll=False, map=None, count=None): +def loop(timeout=1.0, use_poll=False, map=None, count=None): if map is None: map = socket_map @@ -195,6 +219,71 @@ poll_fun(timeout, map) count = count - 1 + +class delayed_call: + + def __init__(self, dispatcher, seconds, target, *args, **kwargs): + assert callable(target), "%s is not callable" %target + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" % (seconds) + # the dispatcher instance who invoked the delayed call, needed + # by poll() and poll2() functions in case they have to call + # dispatcher.handle_error() + self.dispatcher = dispatcher + self.__secs = seconds + self.__target = target # the callable obj + self.__args = args + self.__kwargs = kwargs + self.timeout = gettime() + self.__secs + self.cancelled = False + self._add() + + def __repr__(self): + return repr(self.__target) + + def _add(self): + # we need an unique value referencing every scheduler + while self.timeout in delayed_map: + self.timeout += 0.01 + heapq.heappush(heap_scheduler, self.timeout) + delayed_map[self.timeout] = self + + def _remove(self): + heap_scheduler.remove(self.timeout) + del delayed_map[self.timeout] + + def active(self): + """Return True if this scheduler has not been cancelled.""" + return not self.cancelled + + def call(self): + """Call this scheduled function.""" + self.__target(*self.__args, **self.__kwargs) + + def reset(self): + """Reschedule this call resetting the current countdown.""" + assert not self.cancelled, "Already cancelled." + self._remove() + self.timeout = gettime() + self.__secs + self._add() + + def delay(self, seconds): + """Reschedule this call for a later time.""" + assert not self.cancelled, "Already cancelled." + assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \ + "to 0 seconds" %(seconds) + self.__secs = seconds + self.reset() + + def cancel(self): + """Unschedule this call.""" + assert not self.cancelled, "Already cancelled." + #self.dispatcher = None # avoid reference circles + del self.dispatcher, self.__target, self.__args, self.__kwargs + self._remove() + self.cancelled = True + + class dispatcher: debug = False @@ -252,6 +341,15 @@ del map[fd] self._fileno = None + def call_later(self, seconds, target, *args, **kwargs): + """Call the target function at a latter time returning a + delayed_call class instance. + + Before closing the current dispatcher user must cancel() + pending scheduled functions which are still active. + """ + return delayed_call(self, seconds, target, *args, **kwargs) + def create_socket(self, family, type): self.family_and_type = family, type self.socket = socket.socket(family, type) Index: E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/test/test_asyncore.py =================================================================== --- E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/test/test_asyncore.py (revision 59499) +++ E:/Programmazione/Python/Sources/[PRJ] - pyftpdlib/__svn/python/Lib/test/test_asyncore.py (working copy) @@ -409,9 +409,36 @@ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) +class TestDelayedCalls(unittest.TestCase): + + def test_api(self): + from asyncore import delayed_call, heap_scheduler, gettime + + target = lambda: 1 + d = delayed_call(None, 1, target) + self.assertTrue(d.active()) + self.failUnless(d.call) + d.reset() + x = heap_scheduler[0] + d.delay(5) + self.failUnless(heap_scheduler[0] >= gettime() + 1) + d.cancel() + self.assertFalse(d.active()) + self.failUnless(not heap_scheduler) + self.assertRaises(AssertionError, d.reset) + self.assertRaises(AssertionError, d.delay, 2) + self.assertRaises(AssertionError, d.cancel) + + x = delayed_call(None, 1, target) + y = delayed_call(None, 1, target) + self.assertNotEqual(x.timeout, y.timeout) + self.assertRaises(AssertionError, delayed_call, None, -1, target) + self.assertRaises(AssertionError, x.delay, -1) + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, TestDelayedCalls] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest)