diff --git a/Doc/library/sched.rst b/Doc/library/sched.rst --- a/Doc/library/sched.rst +++ b/Doc/library/sched.rst @@ -27,6 +27,9 @@ .. versionchanged:: 3.3 *timefunc* and *delayfunc* parameters are optional. + .. versionchanged:: 3.3 + :class:`scheduler` class can be safely used in multi-threaded + environments. Example:: @@ -47,33 +50,6 @@ From print_time 930343700.273 930343700.276 -In multi-threaded environments, the :class:`scheduler` class has limitations -with respect to thread-safety, inability to insert a new task before -the one currently pending in a running scheduler, and holding up the main -thread until the event queue is empty. Instead, the preferred approach -is to use the :class:`threading.Timer` class instead. - -Example:: - - >>> import time - >>> from threading import Timer - >>> def print_time(): - ... print("From print_time", time.time()) - ... - >>> def print_some_times(): - ... print(time.time()) - ... Timer(5, print_time, ()).start() - ... Timer(10, print_time, ()).start() - ... time.sleep(11) # sleep while time-delay events execute - ... print(time.time()) - ... - >>> print_some_times() - 930343690.257 - From print_time 930343695.274 - From print_time 930343700.273 - 930343701.301 - - .. _scheduler-objects: Scheduler Objects diff --git a/Doc/whatsnew/3.3.rst b/Doc/whatsnew/3.3.rst --- a/Doc/whatsnew/3.3.rst +++ b/Doc/whatsnew/3.3.rst @@ -662,6 +662,10 @@ sched ----- +* :class:`~sched.scheduler` class can now be safely used in multi-threaded + environments. (Contributed by Josiah Carlson and Giampaolo RodolĂ  in + :issue:`8684`) + * *timefunc* and *delayfunct* parameters of :class:`~sched.scheduler` class constructor are now optional and defaults to :func:`time.time` and :func:`time.sleep` respectively. (Contributed by Chris Clark in diff --git a/Lib/sched.py b/Lib/sched.py --- a/Lib/sched.py +++ b/Lib/sched.py @@ -30,6 +30,7 @@ import time import heapq +import threading from collections import namedtuple __all__ = ["scheduler"] @@ -48,6 +49,7 @@ """Initialize a new instance, passing the time and delay functions""" self._queue = [] + self._lock = threading.RLock() self.timefunc = timefunc self.delayfunc = delayfunc @@ -58,9 +60,10 @@ if necessary. """ - event = Event(time, priority, action, argument, kwargs) - heapq.heappush(self._queue, event) - return event # The ID + with self._lock: + event = Event(time, priority, action, argument, kwargs) + heapq.heappush(self._queue, event) + return event # The ID def enter(self, delay, priority, action, argument=[], kwargs={}): """A variant that specifies the time as a relative time. @@ -68,8 +71,9 @@ This is actually the more commonly used interface. """ - time = self.timefunc() + delay - return self.enterabs(time, priority, action, argument, kwargs) + with self._lock: + time = self.timefunc() + delay + return self.enterabs(time, priority, action, argument, kwargs) def cancel(self, event): """Remove an event from the queue. @@ -78,12 +82,14 @@ If the event is not in the queue, this raises ValueError. """ - self._queue.remove(event) - heapq.heapify(self._queue) + with self._lock: + self._queue.remove(event) + heapq.heapify(self._queue) def empty(self): """Check whether the queue is empty.""" - return not self._queue + with self._lock: + return not self._queue def run(self): """Execute events until the queue is empty. @@ -108,24 +114,25 @@ """ # localize variable access to minimize overhead # and to improve thread safety - q = self._queue - delayfunc = self.delayfunc - timefunc = self.timefunc - pop = heapq.heappop - while q: - time, priority, action, argument, kwargs = checked_event = q[0] - now = timefunc() - if now < time: - delayfunc(time - now) - else: - event = pop(q) - # Verify that the event was not removed or altered - # by another thread after we last looked at q[0]. - if event is checked_event: - action(*argument, **kwargs) - delayfunc(0) # Let other threads run + with self._lock: + q = self._queue + delayfunc = self.delayfunc + timefunc = self.timefunc + pop = heapq.heappop + while q: + time, priority, action, argument, kwargs = checked_event = q[0] + now = timefunc() + if now < time: + delayfunc(time - now) else: - heapq.heappush(q, event) + event = pop(q) + # Verify that the event was not removed or altered + # by another thread after we last looked at q[0]. + if event is checked_event: + action(*argument, **kwargs) + delayfunc(0) # Let other threads run + else: + heapq.heappush(q, event) @property def queue(self): @@ -138,5 +145,6 @@ # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. - events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + with self._lock: + events = self._queue[:] + return map(heapq.heappop, [events]*len(events))