diff -r dad1debba93c Lib/sched.py --- a/Lib/sched.py Mon Oct 21 14:46:34 2013 +0200 +++ b/Lib/sched.py Mon Oct 21 18:32:13 2013 +0300 @@ -43,6 +43,7 @@ __all__ = ["scheduler"] class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')): + _cancelled = False def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority) def __ne__(s, o): return (s.time, s.priority) != (o.time, o.priority) def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority) @@ -59,9 +60,15 @@ functions""" self._queue = [] self._lock = threading.RLock() + self._cancellations = 0 self.timefunc = timefunc self.delayfunc = delayfunc + def _reheapify(self): + self._cancellations = 0 + self._queue = [x for x in self._queue if not x._cancelled] + heapq.heapify(self._queue) + def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel): """Enter a new event in the queue at an absolute time. @@ -93,13 +100,16 @@ """ with self._lock: - self._queue.remove(event) - heapq.heapify(self._queue) + assert isinstance(event, Event), event + if event._cancelled: + raise ValueError("this event was already cancelled") + event._cancelled = True + self._cancellations += 1 def empty(self): """Check whether the queue is empty.""" with self._lock: - return not self._queue + return len(self._queue) <= self._cancellations def run(self, blocking=True): """Execute events until the queue is empty. @@ -136,13 +146,22 @@ with lock: if not q: break - time, priority, action, argument, kwargs = q[0] + time, priority, action, argument, kwargs = event = q[0] + if event._cancelled: + self._cancellations -= 1 + pop(q) + continue now = timefunc() if time > now: + if (self._cancellations > 50 and + self._cancellations > (len(self._queue) >> 1)): + self._reheapify() + continue delay = True else: delay = False pop(q) + if delay: if not blocking: return time - now @@ -163,5 +182,7 @@ # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. with self._lock: + if self._cancellations: + self._reheapify() events = self._queue[:] return list(map(heapq.heappop, [events]*len(events))) diff -r dad1debba93c Lib/test/test_sched.py --- a/Lib/test/test_sched.py Mon Oct 21 14:46:34 2013 +0200 +++ b/Lib/test/test_sched.py Mon Oct 21 18:32:13 2013 +0300 @@ -41,7 +41,7 @@ self._cond.notify_all() -class TestCase(unittest.TestCase): +class TestScheduler(unittest.TestCase): def test_enter(self): l = [] @@ -197,5 +197,26 @@ self.assertEqual(l, []) +class TestCancellations(unittest.TestCase): + + def test_cancel_twice(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertRaises(ValueError, scheduler.cancel, ev) + + def test_queue(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertEqual(list(scheduler.queue), []) + + def test_empty(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertTrue(scheduler.empty()) + + if __name__ == "__main__": unittest.main()