This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author neologix
Recipients bquinlan, gvanrossum, neologix, pitrou, r.david.murray, serhiy.storchaka
Date 2013-05-12.19:31:19
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <CAH_1eM1jhnbQnhtFRiFebe1VdgsQB1qWOBauhSm_4XoJeWQ+PQ@mail.gmail.com>
In-reply-to <CAH_1eM04Oxh+tsM48fy=mntdpHg6-4uop=-w98dSTyRk0-zKZA@mail.gmail.com>
Content
I'd like to have your opinion regarding some implementation choices I made.

1) There's a new ScheduledFuture deriving from Future, which adds
methods to check if the task is periodic, to get the delay until the
next scheduled execution, support re-arming (for periodic tasks), and
finally is orderable with respect to the scheduled time.
ScheduledExecutor methods (submit(), schedule(), schedule_fixed_rate()
and schedule_fixed_delay()) return a ScheduledFuture instance. That's
an addition to the API. Is that acceptable? If not, one solution would
be make _ScheduledFuture private, and document those methods as
returning a base Future (but then the client can't query periodicity,
next scheduled time, etc).

2) In the latest version of the patch, I changed _WorkerItem and
_ScheduledWorkerItem to subclass Future and ScheduledFuture. The
reason is that when you look at the code, those wrapper just add a run
method to the future, but otherwise just delegate to the wrapped
future. In fact, a work item is just a runnable future, so why not
subclass it?
Also, for example for the scheduled executor, you want to maintain
work items in wrapped-future scheduled time order (i.e. the soonest
expiring future at the head of the queue), and easily if the wrapped
future is cancelled, or get its delay:

So you have to do something like:

"""
@functools.total_ordering
class _ScheduledWorkItem(_WorkItem):
    [...]

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            if self.future.is_periodic():
                # rearm the future
                self.future.rearm()
                # and re-schedule ourselves
                try:
                    self._pool._schedule(self)
                except RuntimeError:
                    # pool shut down
                    pass
            else:
                # we only set the result in case of one-shot
                self.future.set_result(result)

    def __eq__(self, other):
        return self is other

    def __lt__(self, other):
        return self.future < other.future

    def get_delay(self):
        return self.future.get_delay()

    def cancelled(self):
        return self.future.cancelled()
"""

Basically, this object merely delegate to the wrapped future.
Instead, if the work item is a subclass of Future, it looks like:

"""
class _ScheduledWorkItem(_base.ScheduledFuture):
    [...]

    def run(self):
        if not self.set_running_or_notify_cancel():
            return

        try:
            result = self._fn(*self._args, **self._kwargs)
        except BaseException as e:
            self.set_exception(e)
        else:
            if self.is_periodic():
                # rearm and reschedule ourselves
                self.rearm()
                try:
                    self._pool._schedule(self)
                except RuntimeError:
                    # pool shut down
                    pass
            else:
                # we only set the result in case of one-shot
                self.set_result(result)
"""

And to return a future from submit(), instead of:
"""
            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
"""

You just do:
"""
            w = _WorkItem(fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return w
"""

The work item inherits all the necessary methods and behavior (like
order) from the future.
It looks much more natural to me.
Also, you don't have to create two objects (the future and the wrapper
work item) per submitted task, so you save some memory, and maybe some
CPU cycles too.

I see the following downsides:
- you don't return a base Future, but a subclass. But to me, Future is
more of an interface than an implementation, see e.g. Tulip's futures
- since the function and arguments are stored in the work item, this
keeps references to them as long as the returned future is alive. That
could be a problem for code expecting the passed fn/args/kwargs to be
collected in a timely manner (see e.g. #16284 for a database
collection), but it could be solved easily by having the worker's run
method explicitly clear those references after execution (except for
periodic execution of course).

What do you think (sorry for the long message ;-) ?
History
Date User Action Args
2013-05-12 19:31:20neologixsetrecipients: + neologix, gvanrossum, bquinlan, pitrou, r.david.murray, serhiy.storchaka
2013-05-12 19:31:20neologixlinkissue17956 messages
2013-05-12 19:31:19neologixcreate