diff -r 1267d64c14b3 Lib/sched.py --- a/Lib/sched.py Thu Nov 01 14:52:03 2012 +0200 +++ b/Lib/sched.py Thu Nov 01 18:16:15 2012 +0200 @@ -124,27 +124,29 @@ """ # localize variable access to minimize overhead # and to improve thread safety - with self._lock: - q = self._queue - delayfunc = self.delayfunc - timefunc = self.timefunc - pop = heapq.heappop - while q: - time, priority, action, argument, kwargs = checked_event = q[0] + lock = self._lock + q = self._queue + delayfunc = self.delayfunc + timefunc = self.timefunc + pop = heapq.heappop + while True: + with lock: + if not q: + break + time, priority, action, argument, kwargs = q[0] now = timefunc() - if now < time: - if not blocking: - return time - now - delayfunc(time - now) + if time > now: + delay = True else: - event = pop(q) - # Verify that the event was not removed or altered - # by another thread after we last looked at q[0]. - if event is checked_event: - action(*argument, **kwargs) - delayfunc(0) # Let other threads run - else: - heapq.heappush(q, event) + delay = False + pop(q) + if delay: + if not blocking: + return time - now + delayfunc(time - now) + else: + action(*argument, **kwargs) + delayfunc(0) # Let other threads run @property def queue(self): diff -r 1267d64c14b3 Lib/test/test_sched.py --- a/Lib/test/test_sched.py Thu Nov 01 14:52:03 2012 +0200 +++ b/Lib/test/test_sched.py Thu Nov 01 18:16:15 2012 +0200 @@ -4,7 +4,10 @@ import time import unittest from test import support - +try: + import threading +except ImportError: + threading = None class TestCase(unittest.TestCase): @@ -26,6 +29,20 @@ scheduler.run() self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_enter_concurrent(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler(time.time, time.sleep) + scheduler.enter(0.03, 1, fun, (0.03,)) + t = threading.Thread(target=scheduler.run) + t.start() + for x in [0.05, 0.04, 0.02, 0.01]: + z = scheduler.enter(x, 1, fun, (x,)) + scheduler.run() + t.join() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + def test_priority(self): l = [] fun = lambda x: l.append(x) @@ -50,6 +67,24 @@ scheduler.run() self.assertEqual(l, [0.02, 0.03, 0.04]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_cancel_concurrent(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler(time.time, time.sleep) + now = time.time() + event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) + event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) + event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) + event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) + event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) + t = threading.Thread(target=scheduler.run) + t.start() + scheduler.cancel(event1) + scheduler.cancel(event5) + t.join() + self.assertEqual(l, [0.02, 0.03, 0.04]) + def test_empty(self): l = [] fun = lambda x: l.append(x)