This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: concurrent.futures: add ScheduledExecutor
Type: enhancement Stage: patch review
Components: Library (Lib) Versions: Python 3.4
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: bquinlan, gvanrossum, neologix, pitrou, r.david.murray, serhiy.storchaka, tinchester
Priority: normal Keywords: needs review, patch

Created on 2013-05-11 17:45 by neologix, last changed 2022-04-11 14:57 by admin.

Files
File name Uploaded Description Edit
scheduled-1.diff neologix, 2013-05-11 17:45 review
scheduled-2.diff neologix, 2013-05-11 18:34 review
scheduled-3.diff neologix, 2013-05-12 17:07 review
Messages (10)
msg188935 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2013-05-11 17:45
Here's an implementation of a new ScheduledExecutor abstract class, with a concrete ScheduledThreadPoolExecutor implementation (see #995907).
The aim is to provide a flexible, efficient and consistent framework for delayed and periodic tasks, leveraging on futures. Incidentally, this supersedes threading.Timer, which is quite fragile and inefficient.

Here's a patch with test (I didn't write the documentation, I prefer to have some feedback first :-), the API is complete.

There's one thing that bothers me with the current implementation: when a future is cancelled, like for regular ThreadPoolExecutor, it doesn't get removed from the work queue right away, but only when it gets dequeued. For a delayed future, this means that one has to wait for the next scheduled execution (i.e. worst case <now> + future.period) for it to be  effectively cancelled and removed from the queue, and for the executor to be shutdown. I'm considering using a callback (Future.add_done_callback()), that's kind of hackish but I don't see any other way.
msg188943 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2013-05-11 18:38
It will be good to be compatible with sched.scheduler.
msg188958 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2013-05-11 21:42
> It will be good to be compatible with sched.scheduler.

What do you mean? Actually, this would kind of supersede the sched
module (except that the sched module supports custom time and delay
functions).

By the way, for those that didn't realize it, it's heavily inspired by
Java ScheduledExecutorService interface (pretty much like Python's
Executor mirrors Java's ExecutorService).
msg189041 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2013-05-12 15:42
1. Extends an abstract interface to support of a priority and absolute time.
2. Subclass sched.scheduler from this interface and implement missing methods.
msg189050 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2013-05-12 17:07
> 1. Extends an abstract interface to support of a priority

I'm not sure I see the use case for priority support, do you have a
sample use case?

Furthermore, the executor is backed by a thread pool, so tasks can be
run concurrently.

Finally, the ordering is based on real-clock time - not user-provided
timefunc and delayfunc: so the probability of having identical
scheduled time (priority is only used for ties) is virtually 0.

>  and absolute time.

If you want and absolute time, you can simply do:

p.schedule(abstime - time(), fn, args)

I don't see the need to complicate the API.

> 2. Subclass sched.scheduler from this interface and implement missing methods.

There again, I don't see the need.
The goal of ScheduledExecutor is to be consistent with the Executor
interface and futures, not being backward-compatible with the sched
module.

Also, the sched module simply can't support some operations: for
example, it's impossible to have the schedule.run() method wake up
when a new event with a deadline easiest than the current one is
inserted.

Really, there is now reason to make it compatible or similar to the
sched module: this will - it it gets accepted - effectively deprecate
threading.Timer and the sched module (except that the later supports
user-provided time and delay functions, I don't know how often those
are used).
msg189057 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2013-05-12 19:31
I'd like to have your opinion regarding some implementation choices I made.

1) There's a new ScheduledFuture deriving from Future, which adds
methods to check if the task is periodic, to get the delay until the
next scheduled execution, support re-arming (for periodic tasks), and
finally is orderable with respect to the scheduled time.
ScheduledExecutor methods (submit(), schedule(), schedule_fixed_rate()
and schedule_fixed_delay()) return a ScheduledFuture instance. That's
an addition to the API. Is that acceptable? If not, one solution would
be make _ScheduledFuture private, and document those methods as
returning a base Future (but then the client can't query periodicity,
next scheduled time, etc).

2) In the latest version of the patch, I changed _WorkerItem and
_ScheduledWorkerItem to subclass Future and ScheduledFuture. The
reason is that when you look at the code, those wrapper just add a run
method to the future, but otherwise just delegate to the wrapped
future. In fact, a work item is just a runnable future, so why not
subclass it?
Also, for example for the scheduled executor, you want to maintain
work items in wrapped-future scheduled time order (i.e. the soonest
expiring future at the head of the queue), and easily if the wrapped
future is cancelled, or get its delay:

So you have to do something like:

"""
@functools.total_ordering
class _ScheduledWorkItem(_WorkItem):
    [...]

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            if self.future.is_periodic():
                # rearm the future
                self.future.rearm()
                # and re-schedule ourselves
                try:
                    self._pool._schedule(self)
                except RuntimeError:
                    # pool shut down
                    pass
            else:
                # we only set the result in case of one-shot
                self.future.set_result(result)

    def __eq__(self, other):
        return self is other

    def __lt__(self, other):
        return self.future < other.future

    def get_delay(self):
        return self.future.get_delay()

    def cancelled(self):
        return self.future.cancelled()
"""

Basically, this object merely delegate to the wrapped future.
Instead, if the work item is a subclass of Future, it looks like:

"""
class _ScheduledWorkItem(_base.ScheduledFuture):
    [...]

    def run(self):
        if not self.set_running_or_notify_cancel():
            return

        try:
            result = self._fn(*self._args, **self._kwargs)
        except BaseException as e:
            self.set_exception(e)
        else:
            if self.is_periodic():
                # rearm and reschedule ourselves
                self.rearm()
                try:
                    self._pool._schedule(self)
                except RuntimeError:
                    # pool shut down
                    pass
            else:
                # we only set the result in case of one-shot
                self.set_result(result)
"""

And to return a future from submit(), instead of:
"""
            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
"""

You just do:
"""
            w = _WorkItem(fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return w
"""

The work item inherits all the necessary methods and behavior (like
order) from the future.
It looks much more natural to me.
Also, you don't have to create two objects (the future and the wrapper
work item) per submitted task, so you save some memory, and maybe some
CPU cycles too.

I see the following downsides:
- you don't return a base Future, but a subclass. But to me, Future is
more of an interface than an implementation, see e.g. Tulip's futures
- since the function and arguments are stored in the work item, this
keeps references to them as long as the returned future is alive. That
could be a problem for code expecting the passed fn/args/kwargs to be
collected in a timely manner (see e.g. #16284 for a database
collection), but it could be solved easily by having the worker's run
method explicitly clear those references after execution (except for
periodic execution of course).

What do you think (sorry for the long message ;-) ?
msg189116 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-05-13 10:14
To be honest I can't find much to say about this proposal, but I think it would be good if the time function were configurable (both for test purposes, and to allow passing e.g. time.monotonic). I suppose this could be an executor option.
msg189144 - (view) Author: Charles-François Natali (neologix) * (Python committer) Date: 2013-05-13 15:24
> To be honest I can't find much to say about this proposal,

Hum, OK, I thought it would be a useful addition :-)

> but I think it would be good if the time function were configurable (both for test purposes, and to allow passing e.g. time.monotonic). I suppose this could be an executor option.

Note that the time function must be "real" time, since the sleep are
based on condition.wait() (otherwise if you just call an arbitrary
sleep() function you can't be woken up when a new task with an earlier
deadline is submitted).
So it's different from the sched module, we can't really support
arbitrary time functions.

Note that I do think it would be a good idea to use time.monotonic()
when available (almost all modules have been updated, except the
future one). That's probably another issue, though.
msg189147 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-05-13 15:44
> > To be honest I can't find much to say about this proposal,
> 
> Hum, OK, I thought it would be a useful addition :-)

It's probably useful, but I'd have to take a closer look. It's been
a long time I haven't used separate threads for timers...

> Note that the time function must be "real" time, since the sleep are
> based on condition.wait() (otherwise if you just call an arbitrary
> sleep() function you can't be woken up when a new task with an
> earlier
> deadline is submitted).

Agreed, but at least tornado supports a custom time function, even
though their event loop is based on epoll() (under Linux):
http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.time

This means:
1) the custom time function is used to adjust the epoll() timeout
according to pending timed calls
2) the epoll() call times out based on the system time function

I suppose it works well enough when the divergence is not too large,
and/or the wakeups are frequent enough that there are no glaring defects.

"Practicality beats purity", I guess :)
msg260012 - (view) Author: Tin Tvrtković (tinchester) * Date: 2016-02-10 14:06
It's a shame this has been stuck in review for 2.5 years, I could really use something like this right now.

neologix, why don't you put this up on PyPI for a while, at least?
History
Date User Action Args
2022-04-11 14:57:45adminsetgithub: 62156
2016-02-10 21:52:50vstinnersettitle: add ScheduledExecutor -> concurrent.futures: add ScheduledExecutor
2016-02-10 14:06:01tinchestersetnosy: + tinchester
messages: + msg260012
2013-05-13 15:44:39pitrousetmessages: + msg189147
2013-05-13 15:24:50neologixsetmessages: + msg189144
2013-05-13 10:14:10pitrousetmessages: + msg189116
2013-05-12 19:31:20neologixsetmessages: + msg189057
2013-05-12 17:07:59neologixsetfiles: + scheduled-3.diff

messages: + msg189050
2013-05-12 15:53:16pitrousetversions: + Python 3.4
2013-05-12 15:42:49serhiy.storchakasetmessages: + msg189041
2013-05-11 21:42:16neologixsetmessages: + msg188958
2013-05-11 18:38:51serhiy.storchakasetnosy: + serhiy.storchaka
messages: + msg188943
2013-05-11 18:34:36neologixsetfiles: + scheduled-2.diff
2013-05-11 18:04:28serhiy.storchakasetnosy: + gvanrossum, bquinlan
2013-05-11 17:46:49neologixlinkissue995907 superseder
2013-05-11 17:45:42neologixcreate