diff --git a/Doc/library/sched.rst b/Doc/library/sched.rst --- a/Doc/library/sched.rst +++ b/Doc/library/sched.rst @@ -14,7 +14,8 @@ The :mod:`sched` module defines a class which implements a general purpose event scheduler: -.. class:: scheduler(timefunc=time.time, delayfunc=time.sleep) +.. class:: scheduler(timefunc=time.time, delayfunc=time.sleep, + synchronized=False) The :class:`scheduler` class defines a generic interface to scheduling events. It needs two functions to actually deal with the "outside world" --- *timefunc* @@ -23,10 +24,14 @@ argument, compatible with the output of *timefunc*, and should delay that many time units. *delayfunc* will also be called with the argument ``0`` after each event is run to allow other threads an opportunity to run in multi-threaded - applications. + applications. If *synchronized* is set True :class:`scheduler` can be safely + used in multi-threaded environments. + .. versionchanged:: 3.3 *timefunc* and *delayfunc* parameters are optional. + .. versionadded:: 3.3 + *synchronized* parameter was added. Example:: @@ -47,33 +52,6 @@ From print_time 930343700.273 930343700.276 -In multi-threaded environments, the :class:`scheduler` class has limitations -with respect to thread-safety, inability to insert a new task before -the one currently pending in a running scheduler, and holding up the main -thread until the event queue is empty. Instead, the preferred approach -is to use the :class:`threading.Timer` class instead. - -Example:: - - >>> import time - >>> from threading import Timer - >>> def print_time(): - ... print("From print_time", time.time()) - ... - >>> def print_some_times(): - ... print(time.time()) - ... Timer(5, print_time, ()).start() - ... Timer(10, print_time, ()).start() - ... time.sleep(11) # sleep while time-delay events execute - ... print(time.time()) - ... - >>> print_some_times() - 930343690.257 - From print_time 930343695.274 - From print_time 930343700.273 - 930343701.301 - - .. _scheduler-objects: Scheduler Objects diff --git a/Lib/sched.py b/Lib/sched.py --- a/Lib/sched.py +++ b/Lib/sched.py @@ -30,6 +30,8 @@ import time import heapq +import functools +import threading from collections import namedtuple __all__ = ["scheduler"] @@ -42,15 +44,32 @@ def __gt__(s, o): return (s.time, s.priority) > (o.time, o.priority) def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority) + +def _local_synchronize(method): + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + # slight performance improvement over using FakeLock all the + # time for all method calls + if self._lock is None: + return method(self, *args, **kwargs) + else: + with self._lock: + return method(self, *args, **kwargs) + return wrapper + + class scheduler: - def __init__(self, timefunc=time.time, delayfunc=time.sleep): + def __init__(self, timefunc=time.time, delayfunc=time.sleep, + synchronized=False): """Initialize a new instance, passing the time and delay functions""" self._queue = [] + self._lock = threading.RLock() if synchronized else None self.timefunc = timefunc self.delayfunc = delayfunc + @_local_synchronize def enterabs(self, time, priority, action, argument=[], kwargs={}): """Enter a new event in the queue at an absolute time. @@ -62,6 +81,7 @@ heapq.heappush(self._queue, event) return event # The ID + @_local_synchronize def enter(self, delay, priority, action, argument=[], kwargs={}): """A variant that specifies the time as a relative time. @@ -71,6 +91,7 @@ time = self.timefunc() + delay return self.enterabs(time, priority, action, argument, kwargs) + @_local_synchronize def cancel(self, event): """Remove an event from the queue. @@ -81,10 +102,12 @@ self._queue.remove(event) heapq.heapify(self._queue) + @_local_synchronize def empty(self): """Check whether the queue is empty.""" return not self._queue + @_local_synchronize def run(self): """Execute events until the queue is empty. @@ -128,6 +151,7 @@ heapq.heappush(q, event) @property + @_local_synchronize def queue(self): """An ordered list of upcoming events. diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py --- a/Lib/test/test_sched.py +++ b/Lib/test/test_sched.py @@ -6,12 +6,13 @@ from test import support -class TestCase(unittest.TestCase): +class TestSched(unittest.TestCase): + synchronized = False def test_enter(self): l = [] fun = lambda x: l.append(x) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: z = scheduler.enter(x, 1, fun, (x,)) scheduler.run() @@ -20,7 +21,7 @@ def test_enterabs(self): l = [] fun = lambda x: l.append(x) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: z = scheduler.enterabs(x, 1, fun, (x,)) scheduler.run() @@ -38,7 +39,7 @@ def test_cancel(self): l = [] fun = lambda x: l.append(x) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) event1 = scheduler.enter(0.01, 1, fun, (0.01,)) event2 = scheduler.enter(0.02, 1, fun, (0.02,)) event3 = scheduler.enter(0.03, 1, fun, (0.03,)) @@ -52,7 +53,7 @@ def test_empty(self): l = [] fun = lambda x: l.append(x) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) self.assertTrue(scheduler.empty()) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: z = scheduler.enterabs(x, 1, fun, (x,)) @@ -63,7 +64,7 @@ def test_queue(self): l = [] fun = lambda x: l.append(x) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) e5 = scheduler.enter(0.05, 1, fun) e1 = scheduler.enter(0.01, 1, fun) e2 = scheduler.enter(0.02, 1, fun) @@ -81,13 +82,18 @@ self.assertEqual(a, (1,2,3)) self.assertEqual(b, {"foo":1}) - scheduler = sched.scheduler(time.time, time.sleep) + scheduler = sched.scheduler(synchronized=self.synchronized) z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1}) scheduler.run() self.assertEqual(flag, [None]) + +class TestSchedSynchronized(TestSched): + synchronized = True + + def test_main(): - support.run_unittest(TestCase) + support.run_unittest(TestSched, TestSchedSynchronized) if __name__ == "__main__": test_main()