diff -r 0d41a1b4c4fe Lib/sched.py --- a/Lib/sched.py Mon Oct 08 07:46:11 2012 +0200 +++ b/Lib/sched.py Mon Oct 08 13:54:27 2012 +0300 @@ -43,6 +43,7 @@ __all__ = ["scheduler"] class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')): + _cancelled = False def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority) def __ne__(s, o): return (s.time, s.priority) != (o.time, o.priority) def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority) @@ -57,6 +58,7 @@ functions""" self._queue = [] self._lock = threading.RLock() + self._cancellations = 0 self.timefunc = timefunc self.delayfunc = delayfunc @@ -90,13 +92,16 @@ """ with self._lock: - self._queue.remove(event) - heapq.heapify(self._queue) + assert isinstance(event, Event), event + if event._cancelled: + raise ValueError("this event was already cancelled") + event._cancelled = True + self._cancellations += 1 def empty(self): """Check whether the queue is empty.""" with self._lock: - return not self._queue + return len(self._queue) <= self._cancellations def run(self, blocking=True): """Execute events until the queue is empty. @@ -129,15 +134,26 @@ delayfunc = self.delayfunc timefunc = self.timefunc pop = heapq.heappop + heapify = heapq.heapify while q: time, priority, action, argument, kwargs = checked_event = q[0] now = timefunc() if now < time: + if not blocking and checked_event._cancelled or \ + self._cancellations > 50 and \ + self._cancellations > (len(self._queue) >> 1): + self._cancellations = 0 + self._queue = [x for x in self._queue if not x._cancelled] + heapify(self._queue) + continue if not blocking: return time - now delayfunc(time - now) else: event = pop(q) + if event._cancelled: + self._cancellations -= 1 + continue # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: @@ -159,4 +175,8 @@ # the actual order they would be retrieved. with self._lock: events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + heappop = heapq.heappop + while events: + ev = heappop(events) + if not ev._cancelled: + yield ev diff -r 0d41a1b4c4fe Lib/test/test_sched.py --- a/Lib/test/test_sched.py Mon Oct 08 07:46:11 2012 +0200 +++ b/Lib/test/test_sched.py Mon Oct 08 13:54:27 2012 +0300 @@ -6,7 +6,7 @@ from test import support -class TestCase(unittest.TestCase): +class TestScheduler(unittest.TestCase): def test_enter(self): l = [] @@ -98,8 +98,29 @@ self.assertEqual(l, []) + +class TestCancellations(unittest.TestCase): + + def test_cancel_twice(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertRaises(ValueError, scheduler.cancel, ev) + + def test_queue(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertEqual(list(scheduler.queue), []) + + def test_empty(self): + scheduler = sched.scheduler() + ev = scheduler.enter(0.01, 1, lambda: 0) + scheduler.cancel(ev) + self.assertTrue(scheduler.empty()) + def test_main(): - support.run_unittest(TestCase) + support.run_unittest(TestScheduler, TestCancellations) if __name__ == "__main__": test_main()