diff --git a/Lib/sched.py b/Lib/sched.py --- a/Lib/sched.py +++ b/Lib/sched.py @@ -43,6 +43,8 @@ __all__ = ["scheduler"] class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')): + def __init__(self, *args, **kwargs): + self._cancelled = False def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority) def __ne__(s, o): return (s.time, s.priority) != (o.time, o.priority) def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority) @@ -59,9 +61,19 @@ functions""" self._queue = [] self._lock = threading.RLock() + self._cancellations = 0 self.timefunc = timefunc self.delayfunc = delayfunc + def _reheapify(self, forced=False): + should_do = self._cancellations > 50 and \ + self._cancellations > (len(self._queue) >> 1) + if should_do or forced: + with self._lock: + self._cancellations = 0 + self._queue = [x for x in self._queue if not x._cancelled] + heapq.heapify(self._queue) + def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel): """Enter a new event in the queue at an absolute time. @@ -93,13 +105,15 @@ """ with self._lock: - self._queue.remove(event) - heapq.heapify(self._queue) + if event._cancelled: + raise ValueError("this event was already cancelled") + event._cancelled = True + self._cancellations += 1 def empty(self): """Check whether the queue is empty.""" with self._lock: - return not self._queue + return (len(self._queue) - self._cancellations) <= 0 def run(self, blocking=True): """Execute events until the queue is empty. @@ -136,7 +150,12 @@ with lock: if not q: break - time, priority, action, argument, kwargs = q[0] + event = q[0] + time, priority, action, argument, kwargs = event + if event._cancelled: + pop(q) + self._cancellations -= 1 + continue now = timefunc() if time > now: delay = True @@ -144,9 +163,11 @@ delay = False pop(q) if delay: + self._reheapify() if not blocking: return time - now - delayfunc(time - now) + else: + delayfunc(time - now) else: action(*argument, **kwargs) delayfunc(0) # Let other threads run @@ -162,6 +183,7 @@ # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. + self._reheapify(forced=True) with self._lock: events = self._queue[:] return list(map(heapq.heappop, [events]*len(events))) diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py --- a/Lib/test/test_sched.py +++ b/Lib/test/test_sched.py @@ -4,7 +4,6 @@ import sched import time import unittest -from test import support try: import threading except ImportError: @@ -197,5 +196,26 @@ self.assertEqual(l, []) +class TestCancellations(unittest.TestCase): + + def test_cancel_twice(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertRaises(ValueError, scheduler.cancel, ev) + + def test_queue(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertEqual(list(scheduler.queue), []) + + def test_empty(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertTrue(scheduler.empty()) + + if __name__ == "__main__": unittest.main()