Index: asyncore.py =================================================================== --- asyncore.py (revision 70129) +++ asyncore.py (working copy) @@ -46,6 +46,7 @@ sophisticated high-performance network servers and clients a snap. """ +import sched import select import socket import sys @@ -60,6 +61,11 @@ except NameError: socket_map = {} +try: + scheduled_events +except NameError: + scheduled_events = sched.scheduler() + def _strerror(err): res = os.strerror(err) if res == 'Unknown error': @@ -186,24 +192,71 @@ poll3 = poll2 # Alias for backward compatibility -def loop(timeout=30.0, use_poll=False, map=None, count=None): +class scheduled_event(object): + def __init__(self, event, scheduler): + self.event = event + self.scheduler = scheduler + + def cancel(self): + self.event._cancelled = True + + @property + def cancelled(self): + return self.event._cancelled + + def reenter(self, delay): + self.cancel() + _, priority, action, argument = self.event + self.event = self.scheduler.enter(delay, priority, action, argument) + return self + + def reenterabs(self, time): + # probably unused generally, but we'll include it to be complete + self.cancel() + _, priority, action, argument = self.event + self.event = self.scheduler.enterabs(time, priority, action, argument) + return self + + def call_later(self, delay, priority, action, argument): + # a convenience method for scheduling events if you already have one + return call_later(delay, priority, action, argument, self.scheduler) + +def call_later(delay, priority, action, argument, tasks=None): + if tasks is None: + tasks = scheduled_events + event = tasks.enter(delay, priority, action, argument) + return scheduled_event(event, tasks) + +class pseudo_counter(object): + def __sub__(self, other): + return self + def __gt__(self, other): + return True + +def loop(timeout=30.0, use_poll=False, map=None, count=None, tasks=None): + _time = time.time if map is None: map = socket_map + if tasks is None: + tasks = scheduled_events + + if count is None: + count = pseudo_counter() + if use_poll and hasattr(select, 'poll'): poll_fun = poll2 else: poll_fun = poll - if count is None: - while map: - poll_fun(timeout, map) + while (map or tasks) and count > 0: + to = timeout + if tasks: + to = min(max(tasks.peek().time - _time(), 0), timeout) + poll_fun(to, map) + tasks.run(_time()) + count = count - 1 - else: - while map and count > 0: - poll_fun(timeout, map) - count = count - 1 - class dispatcher: debug = False Index: sched.py =================================================================== --- sched.py (revision 70129) +++ sched.py (working copy) @@ -28,21 +28,94 @@ # XXX instead of having to define a module or class just to hold # XXX the global state of your particular time and delay functions. +from __future__ import with_statement import heapq -from collections import namedtuple __all__ = ["scheduler"] -Event = namedtuple('Event', 'time, priority, action, argument') +class Event(object): + attrs = 'time priority action argument'.split() + __slots__ = '_values', '_cancelled' + def __init__(self, time, priority, action, argument): + self._values = (time, priority, action, argument) + self._cancelled = False -class scheduler: - def __init__(self, timefunc, delayfunc): + def __lt__(self, other): + return self._values < other._values + + def __eq__(self, other): + return self is other + + def __gt__(self, other): + return self._values > other._values + + def __iter__(self): + for i in self._values: + yield i + + def __getattr__(self, key): + if key == '_cancelled': + return self._cancelled + return self._values[self.attrs.index(key)] + + def _get_cancelled(self): + return self._cancelled + + def _set_cancelled(self, value): + if not value: + if self._cancelled: + raise Exception("You can't un-cancel an event.") + return + self._cancelled = True + + cancelled = property(_get_cancelled, _set_cancelled) + del _get_cancelled, _set_cancelled + + def __repr__(self): + return '%s(%s, %s, %s, %s)'%(self.__class__.__name__, self.time, self.priority, self.action, self.argument) + +def LocalSynchronize(method): + def Call(self, *args, **kwargs): + # slight performance improvement over using FakeLock all the time for all + # method calls + if self.lock is FakeLock: + return method(self, *args, **kwargs) + else: + with self.lock: + return method(self, *args, **kwargs) + Call.__name__ = method.__name__ + return Call + +class FakeLock(object): + def __enter__(self): + return + def __exit__(self, type, value, traceback): + return + def __call__(self): + return self +FakeLock = FakeLock() + +class scheduler(object): + def __init__(self, timefunc, delayfunc, locked=False): """Initialize a new instance, passing the time and delay - functions""" + functions. + + If the optional locked argument (defaulting to false) is true, this + scheduler will use a threading.RLock() instance to guarantee internal + consistancy. + + """ self._queue = [] self.timefunc = timefunc self.delayfunc = delayfunc + self.cancelled = 0 + if not locked: + self.lock = FakeLock + else: + import threading + self.lock = threading.RLock() + @LocalSynchronize def enterabs(self, time, priority, action, argument): """Enter a new event in the queue at an absolute time. @@ -54,6 +127,7 @@ heapq.heappush(self._queue, event) return event # The ID + @LocalSynchronize def enter(self, delay, priority, action, argument): """A variant that specifies the time as a relative time. @@ -63,21 +137,60 @@ time = self.timefunc() + delay return self.enterabs(time, priority, action, argument) + @LocalSynchronize + def peek(self): + """Will return the first item in the queue in a properly synchronized + manner. + + An empty queue will raise an IndexError. + + """ + return self._queue[0] + + @LocalSynchronize def cancel(self, event): """Remove an event from the queue. This must be presented the ID as returned by enter(). - If the event is not in the queue, this raises RuntimeError. """ - self._queue.remove(event) - heapq.heapify(self._queue) + self.cancelled += 1 + event.cancelled = True + if self._ok_clear(): + self._clear_cancelled() + @LocalSynchronize def empty(self): """Check whether the queue is empty.""" return not self._queue - def run(self): + def _ok_clear(self): + """Clear cancelled events when the queue is at least 128 events and more + than half of the events are cancelled, or clear regardless of the event + count when cancelled events are at least 7/8 of all events in the queue. + + """ + lq = len(self._queue) + return (lq > 128 and self.cancelled > (lq>>1)) or (self.cancelled<<3 > lq*7) + + @LocalSynchronize + def _clear_cancelled(self): + """Clears all cancelled events and re-heapifies the schedule. + + Note the following: + + >>> a = sched.queue + >>> sched._clear_cancelled() + >>> b = sched.queue + + After the above is executed, a and b may not be the same! + + """ + self._queue[:] = [i for i in self._queue if not i.cancelled] + heapq.heapify(self._queue) + self.cancelled = 0 + + def run(self, now=None): """Execute events until the queue is empty. When there is a positive delay until the first event, the @@ -104,31 +217,64 @@ delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop - while q: + push = heapq.heappush + peek = self.peek + lock = self.lock + if self._ok_clear(): + self._clear_cancelled() + while q and ((now is None) or (q[0].time <= now)): time, priority, action, argument = checked_event = q[0] now = timefunc() if now < time: delayfunc(time - now) else: - event = pop(q) + with lock: + event = pop(q) # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: - action(*argument) - delayfunc(0) # Let other threads run + if event.cancelled: + with lock: + self.cancelled -= 1 + else: + action(*argument) + delayfunc(0) # Let other threads run else: - heapq.heappush(q, event) + with lock: + push(q, event) - @property - def queue(self): + def __len__(self): + # Use max in order to fix a potential race condition + return max(len(self._queue) - self.cancelled, 0) + + @LocalSynchronize + def getqueue(self, now=None, copy=True, auto_clear=True): """An ordered list of upcoming events. - Events are named tuples with fields for: + Events are Event objects with fields for: time, priority, action, arguments """ # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. - events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + if copy: + events = self._queue[:] + else: + events = self._queue + if auto_clear and self._ok_clear(): + self._clear_cancelled() + out = [] + pop = heapq.heappop + while events and ((now is None) or (events[0].time <= now)): + event = pop(events) + if not event.cancelled: + out.append(event) + elif not copy: # the event was cancelled, and this is not a copy + self.cancelled -= 1 + return out + + @property + @LocalSynchronize + def queue(self): + return self.getqueue()