Index: asyncore.py =================================================================== --- asyncore.py (revision 70874) +++ asyncore.py (working copy) @@ -46,6 +46,7 @@ sophisticated high-performance network servers and clients a snap. """ +import sched import select import socket import sys @@ -60,6 +61,12 @@ except NameError: socket_map = {} +try: + scheduled_tasks +except NameError: + _sleep = lambda x: None + scheduled_tasks = sched.scheduler(time.time, _sleep) + def _strerror(err): res = os.strerror(err) if res == 'Unknown error': @@ -188,24 +195,76 @@ poll3 = poll2 # Alias for backward compatibility -def loop(timeout=30.0, use_poll=False, map=None, count=None): +class scheduled_task(object): + def __init__(self, event, scheduler, seconds): + self.event = event + self.scheduler = scheduler + self.seconds = seconds + + def cancel(self): + self.event._cancelled = True + + @property + def cancelled(self): + return self.event._cancelled + + def reset(self): + return self.delay(self.seconds) + + def delay(self, seconds): + assert sys.maxint >= seconds >= 0, ("%s is not greater than or equal " + "to 0 seconds" % (seconds,)) + self.cancel() + _, _, act, arg = self.event + self.event = self.scheduler.enter(seconds, 0, act, arg) + self.seconds = seconds + return self + +def _caller(fcn, args, kwargs): + def call(*ignore): + return fcn(*args, **kwargs) + return call + +def call_later(seconds, action, *args, **kwargs): + assert callable(action) + assert sys.maxint >= seconds >= 0, ("%s is not greater than or equal " + "to 0 seconds" % (seconds,)) + tasks = kwargs.pop('_tasks', None) + if tasks is None: + tasks = scheduled_tasks + event = tasks.enter(seconds, 0, _caller(action, args, kwargs), ()) + return scheduled_task(event, tasks, seconds) + +class pseudo_counter(object): + def __sub__(self, other): + return self + def __gt__(self, other): + return True + +def loop(timeout=30.0, use_poll=False, map=None, count=None, tasks=None): + _time = time.time if map is None: map = socket_map + if tasks is None: + tasks = scheduled_tasks + + if count is None: + count = pseudo_counter() + if use_poll and hasattr(select, 'poll'): poll_fun = poll2 else: poll_fun = poll - if count is None: - while map: - poll_fun(timeout, map) + while (map or tasks) and count > 0: + to = timeout + if tasks: + to = min(max(tasks.peek().time - _time(), 0), to) + poll_fun(to, map) + tasks.run(_time()) + count = count - 1 - else: - while map and count > 0: - poll_fun(timeout, map) - count = count - 1 - class dispatcher: debug = False @@ -540,9 +599,11 @@ info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None, ignore_all=False): +def close_all(map=None, ignore_all=False, tasks=None): if map is None: map = socket_map + if tasks is None: + tasks = scheduled_tasks for x in map.values(): try: x.close() @@ -556,6 +617,14 @@ except: if not ignore_all: raise + for x in tasks.getqueue(): + try: + x.close() + except _reraised_exceptions: + raise + except: + if not ignore_all: + raise map.clear() # Asynchronous File I/O: Index: sched.py =================================================================== --- sched.py (revision 70874) +++ sched.py (working copy) @@ -29,20 +29,89 @@ # XXX the global state of your particular time and delay functions. import heapq -from collections import namedtuple __all__ = ["scheduler"] -Event = namedtuple('Event', 'time, priority, action, argument') +class Event(object): + attrs = 'time priority action argument'.split() + __slots__ = '_values', '_cancelled' + def __init__(self, time, priority, action, argument): + self._values = (time, priority, action, argument) + self._cancelled = False -class scheduler: - def __init__(self, timefunc, delayfunc): + def __eq__(self, other): + return self._values == other._values + + def __lt__(self, other): + return self._values < other._values + + def __iter__(self): + for i in self._values: + yield i + + def __getattr__(self, key): + if key == '_cancelled': + return self._cancelled + return self._values[self.attrs.index(key)] + + def _get_cancelled(self): + return self._cancelled + + def _set_cancelled(self, value): + if not value: + if self._cancelled: + raise Exception("You can't un-cancel an event.") + return + self._cancelled = True + + cancelled = property(_get_cancelled, _set_cancelled) + del _get_cancelled, _set_cancelled + + def __repr__(self): + return '%s(%s, %s, %s, %s)'%(self.__class__.__name__, self.time, self.priority, self.action, self.argument) + +def LocalSynchronize(method): + def Call(self, *args, **kwargs): + # slight performance improvement over using FakeLock all the time for all + # method calls + if self.lock is FakeLock: + return method(self, *args, **kwargs) + else: + with self.lock: + return method(self, *args, **kwargs) + Call.__name__ = method.__name__ + return Call + +class FakeLock(object): + def __enter__(self): + return + def __exit__(self, type, value, traceback): + return + def __call__(self): + return self +FakeLock = FakeLock() + +class scheduler(object): + def __init__(self, timefunc, delayfunc, locked=False): """Initialize a new instance, passing the time and delay - functions""" + functions. + + If the optional locked argument (defaulting to false) is true, this + scheduler will use a threading.RLock() instance to guarantee internal + consistancy. + + """ self._queue = [] self.timefunc = timefunc self.delayfunc = delayfunc + self.cancelled = 0 + if not locked: + self.lock = FakeLock + else: + import threading + self.lock = threading.RLock() + @LocalSynchronize def enterabs(self, time, priority, action, argument): """Enter a new event in the queue at an absolute time. @@ -54,6 +123,7 @@ heapq.heappush(self._queue, event) return event # The ID + @LocalSynchronize def enter(self, delay, priority, action, argument): """A variant that specifies the time as a relative time. @@ -63,21 +133,60 @@ time = self.timefunc() + delay return self.enterabs(time, priority, action, argument) + @LocalSynchronize + def peek(self): + """Will return the first item in the queue in a properly synchronized + manner. + + An empty queue will raise an IndexError. + + """ + return self._queue[0] + + @LocalSynchronize def cancel(self, event): """Remove an event from the queue. This must be presented the ID as returned by enter(). - If the event is not in the queue, this raises RuntimeError. """ - self._queue.remove(event) - heapq.heapify(self._queue) + self.cancelled += 1 + event.cancelled = True + if self._ok_clear(): + self._clear_cancelled() + @LocalSynchronize def empty(self): """Check whether the queue is empty.""" return not self._queue - def run(self): + def _ok_clear(self): + """Clear cancelled events when the queue is at least 128 events and more + than half of the events are cancelled, or clear regardless of the event + count when cancelled events are at least 7/8 of all events in the queue. + + """ + lq = len(self._queue) + return (lq > 128 and self.cancelled > (lq>>1)) or (self.cancelled<<3 > lq*7) + + @LocalSynchronize + def _clear_cancelled(self): + """Clears all cancelled events and re-heapifies the schedule. + + Note the following: + + >>> a = sched.queue + >>> sched._clear_cancelled() + >>> b = sched.queue + + After the above is executed, a and b may not be the same! + + """ + self._queue[:] = [i for i in self._queue if not i.cancelled] + heapq.heapify(self._queue) + self.cancelled = 0 + + def run(self, now=None): """Execute events until the queue is empty. When there is a positive delay until the first event, the @@ -104,31 +213,64 @@ delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop - while q: + push = heapq.heappush + peek = self.peek + lock = self.lock + if self._ok_clear(): + self._clear_cancelled() + while q and ((now is None) or (q[0].time <= now)): time, priority, action, argument = checked_event = q[0] - now = timefunc() - if now < time: - delayfunc(time - now) + _now = timefunc() + if _now < time: + delayfunc(time - _now) else: - event = pop(q) + with lock: + event = pop(q) # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: - action(*argument) - delayfunc(0) # Let other threads run + if event.cancelled: + with lock: + self.cancelled -= 1 + else: + action(*argument) + delayfunc(0) # Let other threads run else: - heapq.heappush(q, event) + with lock: + push(q, event) - @property - def queue(self): + def __len__(self): + # Use max in order to fix a potential race condition + return max(len(self._queue) - self.cancelled, 0) + + @LocalSynchronize + def getqueue(self, now=None, copy=True, auto_clear=True): """An ordered list of upcoming events. - Events are named tuples with fields for: + Events are Event objects with fields for: time, priority, action, arguments """ # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. - events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + if copy: + events = self._queue[:] + else: + events = self._queue + if auto_clear and self._ok_clear(): + self._clear_cancelled() + out = [] + pop = heapq.heappop + while events and ((now is None) or (events[0].time <= now)): + event = pop(events) + if not event.cancelled: + out.append(event) + elif not copy: # the event was cancelled, and this is not a copy + self.cancelled -= 1 + return out + + @property + @LocalSynchronize + def queue(self): + return self.getqueue() Index: test/test_asyncore.py =================================================================== --- test/test_asyncore.py (revision 70874) +++ test/test_asyncore.py (working copy) @@ -313,7 +313,7 @@ sys.stdout = stdout lines = fp.getvalue().splitlines() - expected = ['warning: unhandled exception', + expected = ['warning: unhandled incoming priority event', 'warning: unhandled read event', 'warning: unhandled write event', 'warning: unhandled connect event', @@ -410,10 +410,76 @@ w.close() self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) +class CallLaterTests(unittest.TestCase): + """Tests for CallLater class.""" + def setUp(self): + # remove any unfired scheduled call left behind + asyncore.close_all() + + def scheduler(self, timeout=0.01, count=100): + while len(asyncore.scheduled_tasks) and count > 0: + asyncore.scheduled_tasks.run(time.time()) + count -= 1 + time.sleep(timeout) + + def test_interface(self): + fun = lambda: 0 + self.assertRaises(AssertionError, asyncore.call_later, -1, fun) + x = asyncore.call_later(3, fun) + self.assertRaises(AssertionError, x.delay, -1) + self.assert_(x.cancelled is False) + x.cancel() + self.assert_(x.cancelled is True) + + def test_order(self): + l = [] + fun = lambda x: l.append(x) + for x in [0.5, 0.4, 0.3, 0.2, 0.1]: + asyncore.call_later(x, fun, x) + self.scheduler() + self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) + + def test_delay(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.1, fun, 0.1).delay(0.7) + asyncore.call_later(0.2, fun, 0.2).delay(0.8) + asyncore.call_later(0.3, fun, 0.3) + asyncore.call_later(0.4, fun, 0.4) + asyncore.call_later(0.5, fun, 0.5) + asyncore.call_later(0.6, fun, 0.6).delay(0.01) + self.scheduler() + self.assertEqual(l, [0.6, 0.3, 0.4, 0.5, 0.1, 0.2]) + + def test_reset(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.1, fun, 0.1) + asyncore.call_later(0.2, fun, 0.2) + asyncore.call_later(0.3, fun, 0.3) + x = asyncore.call_later(0.4, fun, 0.4) + asyncore.call_later(0.5, fun, 0.5) + time.sleep(0.2) + x.reset() + self.scheduler() + self.assertEqual(l, [0.1, 0.2, 0.3, 0.5, 0.4]) + + def test_cancel(self): + l = [] + fun = lambda x: l.append(x) + asyncore.call_later(0.1, fun, 0.1).cancel() + asyncore.call_later(0.2, fun, 0.2) + asyncore.call_later(0.3, fun, 0.3) + asyncore.call_later(0.4, fun, 0.4) + asyncore.call_later(0.5, fun, 0.5).cancel() + self.scheduler() + self.assertEqual(l, [0.2, 0.3, 0.4]) + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, CallLaterTests] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest)