Index: Lib/sched.py =================================================================== --- Lib/sched.py (revision 81076) +++ Lib/sched.py (working copy) @@ -30,19 +30,36 @@ import heapq from collections import namedtuple +import threading __all__ = ["scheduler"] Event = namedtuple('Event', 'time, priority, action, argument') +def LocalSynchronize(method): + def Call(self, *args, **kwargs): + # slight performance improvement over using FakeLock all the time for all + # method calls + if self.lock is None: + return method(self, *args, **kwargs) + else: + with self.lock: + return method(self, *args, **kwargs) + Call.__name__ = method.__name__ + Call.__doc__ = method.__doc__ + return Call + class scheduler: - def __init__(self, timefunc, delayfunc): + def __init__(self, timefunc, delayfunc, synchronized=False): """Initialize a new instance, passing the time and delay functions""" self._queue = [] + self._known = set() self.timefunc = timefunc self.delayfunc = delayfunc + self.lock = threading.RLock() if synchronized else None + @LocalSynchronize def enterabs(self, time, priority, action, argument): """Enter a new event in the queue at an absolute time. @@ -52,8 +69,10 @@ """ event = Event(time, priority, action, argument) heapq.heappush(self._queue, event) + self._known.add(id(event)) return event # The ID + @LocalSynchronize def enter(self, delay, priority, action, argument): """A variant that specifies the time as a relative time. @@ -63,6 +82,7 @@ time = self.timefunc() + delay return self.enterabs(time, priority, action, argument) + @LocalSynchronize def cancel(self, event): """Remove an event from the queue. @@ -70,14 +90,53 @@ If the event is not in the queue, this raises ValueError. """ - self._queue.remove(event) - heapq.heapify(self._queue) + ie = id(event) + if ie not in self._known: + raise ValueError() + self._known.discard(ie) def empty(self): """Check whether the queue is empty.""" - return not self._queue + return not self._known - def run(self): + def _ok_clear(self): + """Clear cancelled events when the queue has at least 128 events and + more than half of the events are cancelled, or clear regardless of the + event count when cancelled events are at least 7/8 of all events in the + queue. + + """ + lq = len(self._queue) + cancelled = lq - len(self._known) + return (lq > 128 and cancelled > (lq>>1)) or ((cancelled<<3) > lq*7) + + @LocalSynchronize + def peek(self): + """Will return the first item in the queue in a properly synchronized + manner. + + An empty queue will raise an IndexError. + + """ + return self._queue[0] if self._queue else None + + @LocalSynchronize + def _clear_cancelled(self): + """Clears all cancelled events and re-heapifies the schedule. + + Note the following: + + >>> a = sched.queue + >>> sched._clear_cancelled() + >>> b = sched.queue + + After the above is executed, a and b may not be the same! + + """ + self._queue[:] = [event for event in self._queue if id(event) in self._known] + heapq.heapify(self._queue) + + def run(self, now=None): """Execute events until the queue is empty. When there is a positive delay until the first event, the @@ -104,31 +163,62 @@ delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop - while q: + push = heapq.heappush + known = self._known + lock = self.lock or threading.Lock() + key = id + if self._ok_clear(): + self._clear_cancelled() + while q and ((now is None) or (q[0].time <= now)): time, priority, action, argument = checked_event = q[0] - now = timefunc() - if now < time: - delayfunc(time - now) + _now = timefunc() + if _now < time: + delayfunc(max(time - _now, 0)) else: - event = pop(q) + with lock: + event = pop(q) # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: - action(*argument) - delayfunc(0) # Let other threads run + if key(event) in self._known: + self._known.discard(key(event)) + action(*argument) + delayfunc(0) # Let other threads run else: - heapq.heappush(q, event) + with lock: + push(q, event) - @property - def queue(self): + def __len__(self): + return len(self._known) + + @LocalSynchronize + def getqueue(self, now=None, copy=True, auto_clear=True): """An ordered list of upcoming events. - + Events are named tuples with fields for: time, priority, action, arguments - + """ # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. - events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + if copy: + events = [event for event in self._queue if id(event) in self._known] + known = set(self._known) + else: + events = self._queue + known = self._known + if auto_clear and self._ok_clear(): + self._clear_cancelled() + out = [] + pop = heapq.heappop + while events and ((now is None) or (events[0].time <= now)): + event = pop(events) + if id(event) in known: + known.discard(id(event)) + out.append(event) + return out + + @property + def queue(self): + return self.getqueue()