--- Lib/sched.py 2011-10-22 11:02:08.000000000 -0700 +++ Lib/sched.py 2011-10-22 11:21:28.000000000 -0700 @@ -28,12 +28,13 @@ # XXX instead of having to define a module or class just to hold # XXX the global state of your particular time and delay functions. +import time import heapq from collections import namedtuple __all__ = ["scheduler"] -class Event(namedtuple('Event', 'time, priority, action, argument')): +class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')): def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority) def __ne__(s, o): return (s.time, s.priority) != (o.time, o.priority) def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority) @@ -42,32 +43,32 @@ def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority) class scheduler: - def __init__(self, timefunc, delayfunc): + def __init__(self, timefunc=time.time, delayfunc=time.sleep): """Initialize a new instance, passing the time and delay functions""" self._queue = [] self.timefunc = timefunc self.delayfunc = delayfunc - def enterabs(self, time, priority, action, argument): + def enterabs(self, time, priority, action, argument=[], kwargs={}): """Enter a new event in the queue at an absolute time. Returns an ID for the event which can be used to remove it, if necessary. """ - event = Event(time, priority, action, argument) + event = Event(time, priority, action, argument, kwargs) heapq.heappush(self._queue, event) return event # The ID - def enter(self, delay, priority, action, argument): + def enter(self, delay, priority, action, argument=[], kwargs={}): """A variant that specifies the time as a relative time. This is actually the more commonly used interface. """ time = self.timefunc() + delay - return self.enterabs(time, priority, action, argument) + return self.enterabs(time, priority, action, argument, kwargs) def cancel(self, event): """Remove an event from the queue. @@ -111,7 +112,7 @@ timefunc = self.timefunc pop = heapq.heappop while q: - time, priority, action, argument = checked_event = q[0] + time, priority, action, argument, kwargs = checked_event = q[0] now = timefunc() if now < time: delayfunc(time - now) @@ -120,7 +121,7 @@ # Verify that the event was not removed or altered # by another thread after we last looked at q[0]. if event is checked_event: - action(*argument) + action(*argument, **kwargs) delayfunc(0) # Let other threads run else: heapq.heappush(q, event) --- Lib/test/test_sched.py 2011-10-22 11:37:57.000000000 -0700 +++ Lib/test/test_sched.py 2011-10-22 11:36:27.000000000 -0700 @@ -3,11 +3,31 @@ import sched import time import unittest -from test import support +try: + from test import support +except ImportError: + support = None class TestCase(unittest.TestCase): + def setUp(self): + self.call_back_ran = False + + def test_enter_timing(self): + + def __timeout_cb(ts): + now = time.time() + self.call_back_ran = True + assert int(now - ts['start']) == ts['secs'], 'timeout failed' + + l = [] + ts = {'start': time.time(), 'secs': 2} # NOTE causes 2 sec sleep + scheduler = sched.scheduler(time.time, time.sleep) + z = scheduler.enter(ts['secs'], 1, __timeout_cb, (ts,)) + scheduler.run() + self.assertTrue(self.call_back_ran, 'call back did not run') + def test_enter(self): l = [] fun = lambda x: l.append(x) @@ -26,6 +46,42 @@ scheduler.run() self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + def test_enter_no_timer_provided(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler() + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + z = scheduler.enter(x, 1, fun, (x,)) + scheduler.run() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + + def test_enterabs_no_timer_provided(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler() + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + z = scheduler.enterabs(x, 1, fun, (x,)) + scheduler.run() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + + def test_enter_kwargs_no_timer_provided(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler() + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + z = scheduler.enter(x, 1, fun, kwargs={'x': x}) + scheduler.run() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + + def test_enterabs_kwargs_no_timer_provided(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler() + for x in [0.05, 0.04, 0.03, 0.02, 0.01]: + z = scheduler.enterabs(x, 1, fun, kwargs={'x': x}) + scheduler.run() + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + def test_priority(self): l = [] fun = lambda x: l.append(x) @@ -74,7 +130,11 @@ def test_main(): - support.run_unittest(TestCase) + if support: + support.run_unittest(TestCase) + else: + # being ran outside of full python build system + unittest.main() if __name__ == "__main__": test_main()