diff --git a/runtests.py b/runtests.py --- a/runtests.py +++ b/runtests.py @@ -2,9 +2,12 @@ # Originally written by Beech Horn (for NDB). +import logging import sys import unittest +##logging.basicConfig(level=logging.DEBUG) + def load_tests(): mods = ['events', 'futures', 'tasks'] diff --git a/tulip/events_test.py b/tulip/events_test.py --- a/tulip/events_test.py +++ b/tulip/events_test.py @@ -10,6 +10,7 @@ from . import events from . import transports from . import protocols +from . import selectors from . import unix_events @@ -37,8 +38,8 @@ class EventLoopTestsMixin: def setUp(self): - pollster = self.POLLSTER_CLASS() - event_loop = unix_events.UnixEventLoop(pollster) + selector = self.SELECTOR_CLASS() + event_loop = unix_events.UnixEventLoop(selector) events.set_event_loop(event_loop) def testRun(self): @@ -229,24 +230,24 @@ el.run_once() -if hasattr(select, 'kqueue'): +if hasattr(selectors, 'KqueueSelector'): class KqueueEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - POLLSTER_CLASS = unix_events.KqueuePollster + SELECTOR_CLASS = selectors.KqueueSelector -if hasattr(select, 'epoll'): +if hasattr(selectors, 'EpollSelector'): class EPollEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - POLLSTER_CLASS = unix_events.EPollPollster + SELECTOR_CLASS = selectors.EpollSelector -if hasattr(select, 'poll'): +if hasattr(selectors, 'PollSelector'): class PollEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - POLLSTER_CLASS = unix_events.PollPollster + SELECTOR_CLASS = selectors.PollSelector # Should always exist. class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - POLLSTER_CLASS = unix_events.SelectPollster + SELECTOR_CLASS = selectors.SelectSelector class HandlerTests(unittest.TestCase): diff --git a/tulip/selectors.py b/tulip/selectors.py new file mode 100644 --- /dev/null +++ b/tulip/selectors.py @@ -0,0 +1,347 @@ +"""Select module. + +This module supports asynchronous I/O on multiple file descriptors. +""" + + +from select import * # XXX + + +# generic events, that must be mapped to implementation-specific ones +# read event +SELECT_IN = (1 << 0) +# write event +SELECT_OUT = (1 << 1) +# connect event +SELECT_CONNECT = SELECT_OUT + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + + Returns: + corresponding file descriptor + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (ValueError, TypeError): + raise ValueError("Invalid file object: {!r}".format(fileobj)) + return fd + + +class _Key: + """Object used internally to associate a file object to its backing file + descriptor, selected event mask and attached data.""" + + def __init__(self, fileobj, events, data=None): + self.fileobj = fileobj + self.fd = _fileobj_to_fd(fileobj) + self.events = events + self.data = data + + +class _BaseSelector: + """Base selector class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + performant implementation on the current platform. + """ + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + # this maps file objects to keys - for fast (un)registering + self._fileobj_to_key = {} + + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object + events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT) + data -- attached data + """ + if (not events) or (events & ~(SELECT_IN|SELECT_OUT)): + raise ValueError("Invalid events: {}".format(events)) + + if fileobj in self._fileobj_to_key: + raise ValueError("{!r} is already registered".format(fileobj)) + + key = _Key(fileobj, events, data) + self._fd_to_key[key.fd] = key + self._fileobj_to_key[fileobj] = key + return key + + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object + """ + try: + key = self._fileobj_to_key[fileobj] + del self._fd_to_key[key.fd] + del self._fileobj_to_key[fileobj] + except KeyError: + raise ValueError("{!r} is not registered".format(fileobj)) + return key + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object + events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT) + data -- attached data + """ + self.unregister(fileobj) + self.register(fileobj, events, data) + + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout == 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (fileobj, events, attached data) for ready file objects + `events` is a bitwise mask of SELECT_IN|SELECT_OUT + """ + raise NotImplementedError() + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + self._fd_to_key.clear() + self._fileobj_to_key.clear() + + def get_info(self, fileobj): + """Return information about a registered file object. + + Returns: + (events, data) associated to this file object + + Raises KeyError if the file object is not registered. + """ + try: + key = self._fileobj_to_key[fileobj] + except KeyError: + raise KeyError("{} is not registered".format(fileobj)) + return key.events, key.data + + def registered_count(self): + """Return the number of registered file objects. + + Returns: + number of currently registered file objects + """ + return len(self._fd_to_key) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key + """ + try: + return self._fd_to_key[fd] + except KeyError: + raise RuntimeError("No key found for fd {}".format(fd)) + + +class SelectSelector(_BaseSelector): + """Select-based selector.""" + + def __init__(self): + super().__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & SELECT_IN: + self._readers.add(key.fd) + if events & SELECT_OUT: + self._writers.add(key.fd) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + + def select(self, timeout=None): + r, w, _ = select(self._readers, self._writers, [], timeout) + r = set(r) + w = set(w) + ready = [] + for fd in r | w: + events = 0 + if fd in r: + events |= SELECT_IN + if fd in w: + events |= SELECT_OUT + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + +if 'poll' in globals(): + + class PollSelector(_BaseSelector): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & SELECT_IN: + poll_events |= POLLIN + if events & SELECT_OUT: + poll_events |= POLLOUT + self._poll.register(key.fd, poll_events) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + + def select(self, timeout=None): + timeout = None if timeout is None else int(1000 * timeout) + ready = [] + for fd, event in self._poll.poll(timeout): + events = 0 + if event & ~POLLIN: + events |= SELECT_OUT + if event & ~POLLOUT: + events |= SELECT_IN + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + +if 'epoll' in globals(): + + class EpollSelector(_BaseSelector): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = epoll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & SELECT_IN: + epoll_events |= EPOLLIN + if events & SELECT_OUT: + epoll_events |= EPOLLOUT + self._epoll.register(key.fd, epoll_events) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._epoll.unregister(key.fd) + + def select(self, timeout=None): + timeout = -1 if timeout is None else timeout + max_ev = self.registered_count() + ready = [] + for fd, event in self._epoll.poll(timeout, max_ev): + events = 0 + if event & ~EPOLLIN: + events |= SELECT_OUT + if event & ~EPOLLOUT: + events |= SELECT_IN + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + def close(self): + super().close() + self._epoll.close() + + +if 'kqueue' in globals(): + + class KqueueSelector(_BaseSelector): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = kqueue() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & SELECT_IN: + kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & SELECT_OUT: + kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + + def select(self, timeout=None): + max_ev = self.registered_count() + ready = [] + for kev in self._kqueue.control(None, max_ev, timeout): + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= SELECT_IN + if flag == select.KQ_FILTER_WRITE: + events |= SELECT_OUT + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return events + + def close(self): + super().close() + self._kqueue.close() + + +# Choose the best implementation: roughly, epoll|kqueue > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + Selector = KqueueSelector +elif 'EpollSelector' in globals(): + Selector = EpollSelector +elif 'PollSelector' in globals(): + Selector = PollSelector +else: + Selector = SelectSelector diff --git a/tulip/unix_events.py b/tulip/unix_events.py --- a/tulip/unix_events.py +++ b/tulip/unix_events.py @@ -1,10 +1,8 @@ """UNIX event loop and related classes. -NOTE: The Pollster classes are not part of the published API. - -The event loop can be broken up into a pollster (the part responsible +The event loop can be broken up into a selector (the part responsible for telling us when file descriptors are ready) and the event loop -proper, which wraps a pollster with functionality for scheduling +proper, which wraps a selector with functionality for scheduling callbacks, immediately or at a given time in the future. Whenever a public API takes a callback, subsequent positional @@ -13,22 +11,6 @@ Keyword arguments for the callback are not supported; this is a conscious design decision, leaving the door open for keyword arguments to modify the meaning of the API call itself. - -There are several implementations of the pollster part, several using -esoteric system calls that exist only on some platforms. These are: - -- kqueue (most BSD systems) -- epoll (newer Linux systems) -- poll (most UNIX systems) -- select (all UNIX systems, and Windows) - -NOTE: We don't use select on systems where any of the others is -available, because select performs poorly as the number of file -descriptors goes up. The ranking is roughly: - - 1. kqueue, epoll, IOCP (best for each platform) - 2. poll (linear in number of file descriptors polled) - 3. select (linear in max number of file descriptors supported) """ import collections @@ -46,6 +28,7 @@ from . import events from . import futures from . import protocols +from . import selectors from . import tasks from . import transports @@ -73,352 +56,6 @@ _MAX_WORKERS = 5 -class PollsterBase: - """Base class for all polling implementations. - - This defines an interface to register and unregister readers and - writers for specific file descriptors, and an interface to get a - list of events. There's also an interface to check whether any - readers or writers are currently registered. - """ - - def __init__(self): - super().__init__() - self.readers = {} # {fd: handler, ...}. - self.writers = {} # {fd: handler, ...}. - - def pollable(self): - """Return the number readers and writers currently registered.""" - # The event loop needs the number since it must subtract one for - # the self-pipe. - return len(self.readers) + len(self.writers) - - # Subclasses are expected to extend the add/remove methods. - - def register_reader(self, fd, handler): - """Add or update a reader for a file descriptor.""" - self.readers[fd] = handler - - def register_writer(self, fd, handler): - """Add or update a writer for a file descriptor.""" - self.writers[fd] = handler - - def unregister_reader(self, fd): - """Remove the reader for a file descriptor.""" - del self.readers[fd] - - def unregister_writer(self, fd): - """Remove the writer for a file descriptor.""" - del self.writers[fd] - - def register_connector(self, fd, handler): - """Add or update a connector for a file descriptor.""" - # On Unix a connector is the same as a writer. - self.register_writer(fd, handler) - - def unregister_connector(self, fd): - """Remove the connector for a file descriptor.""" - # On Unix a connector is the same as a writer. - self.unregister_writer(fd) - - def poll(self, timeout=None): - """Poll for I/O events. A subclass must implement this. - - If timeout is omitted or None, this blocks until at least one - event is ready. Otherwise, timeout gives a maximum time to - wait (an int of float in seconds) -- the method returns as - soon as at least one event is ready or when the timeout is - expired. For a non-blocking poll, pass 0. - - The return value is a list of events; it is empty when the - timeout expired before any events were ready. Each event - is a handler previously passed to register_reader/writer(). - """ - raise NotImplementedError - - -if sys.platform != 'win32': - - class SelectPollster(PollsterBase): - """Pollster implementation using select.""" - - def poll(self, timeout=None): - readable, writable, _ = select.select(self.readers, self.writers, - [], timeout) - events = [] - events += (self.readers[fd] for fd in readable) - events += (self.writers[fd] for fd in writable) - return events - -else: - - class SelectPollster(PollsterBase): - """Pollster implementation using select.""" - - def __init__(self): - super().__init__() - self.exceptionals = {} - - def poll(self, timeout=None): - # Failed connections are reported as exceptional but not writable. - readable, writable, exceptional = select.select( - self.readers, self.writers, self.exceptionals, timeout) - writable = set(writable).union(exceptional) - events = [] - events += (self.readers[fd] for fd in readable) - events += (self.writers[fd] for fd in writable) - return events - - def register_connector(self, fd, token): - self.register_writer(fd, token) - self.exceptionals[fd] = token - - def unregister_connector(self, fd): - self.unregister_writer(fd) - try: - del self.exceptionals[fd] - except KeyError: - # remove_connector() does not check fd in self.exceptionals. - pass - - -class PollPollster(PollsterBase): - """Pollster implementation using poll.""" - - def __init__(self): - super().__init__() - self._poll = select.poll() - - def _update(self, fd): - assert isinstance(fd, int), fd - flags = 0 - if fd in self.readers: - flags |= select.POLLIN - if fd in self.writers: - flags |= select.POLLOUT - if flags: - self._poll.register(fd, flags) - else: - self._poll.unregister(fd) - - def register_reader(self, fd, handler): - super().register_reader(fd, handler) - self._update(fd) - - def register_writer(self, fd, handler): - super().register_writer(fd, handler) - self._update(fd) - - def unregister_reader(self, fd): - super().unregister_reader(fd) - self._update(fd) - - def unregister_writer(self, fd): - super().unregister_writer(fd) - self._update(fd) - - def poll(self, timeout=None): - # Timeout is in seconds, but poll() takes milliseconds. - msecs = None if timeout is None else int(round(1000 * timeout)) - events = [] - for fd, flags in self._poll.poll(msecs): - if flags & ~select.POLLOUT: - if fd in self.readers: - events.append(self.readers[fd]) - if flags & ~select.POLLIN: - if fd in self.writers: - events.append(self.writers[fd]) - return events - - -if sys.platform == 'win32': - - class WindowsPollPollster(PollPollster): - """Pollster implementation using WSAPoll. - - WSAPoll is only available on Windows Vista and later. Python - does not currently support WSAPoll, but there is a patch - available at http://bugs.python.org/issue16507. - """ - - # REAP_PERIOD is the maximum wait before checking for failed - # connections. This is necessary because WSAPoll() does notify us - # of failed connections. See - # daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/ - REAP_PERIOD = 5.0 - - # FD_SETSIZE is maximum number of sockets in an fd_set - FD_SETSIZE = 512 - - def __init__(self): - super().__init__() - self.exceptionals = {} - - def register_connector(self, fd, token): - self.register_writer(fd, token) - self.exceptionals[fd] = token - - def unregister_connector(self, fd): - self.unregister_writer(fd) - try: - del self.exceptionals[fd] - except KeyError: - # remove_connector() does not check fd in self.exceptionals. - pass - - def _get_failed_connector_events(self): - fds = [] - remaining = list(self.exceptionals) - while remaining: - fds += select.select([], [], remaining[:self.FD_SETSIZE], 0)[2] - del remaining[:self.FD_SETSIZE] - return [(fd, select.POLLOUT) for fd in fds] - - def poll(self, timeout=None): - if not self.exceptionals: - msecs = None if timeout is None else int(round(1000 * timeout)) - polled = self._poll.poll(msecs) - - elif timeout is None: - polled = None - while not polled: - polled = (self._get_failed_connector_events() or - self._poll.poll(self.REAP_PERIOD)) - - elif timeout == 0: - polled = (self._get_failed_connector_events() or - self._poll.poll(0)) - - else: - start = time.monotonic() - deadline = start + timeout - polled = None - while timeout >= 0: - msecs = int(round(1000 * min(self.REAP_PERIOD, timeout))) - polled = (self._get_failed_connector_events() or - self._poll.poll(self.REAP_PERIOD)) - if polled: - break - timemout = deadline - time.monotonic() - - events = [] - for fd, flags in polled: - if flags & ~select.POLLOUT: - if fd in self.readers: - events.append(self.readers[fd]) - if flags & ~select.POLLIN: - if fd in self.writers: - events.append(self.writers[fd]) - return events - - PollPollster = WindowsPollPollster - - -class EPollPollster(PollsterBase): - """Pollster implementation using epoll.""" - - def __init__(self): - super().__init__() - self._epoll = select.epoll() - - def _update(self, fd): - assert isinstance(fd, int), fd - eventmask = 0 - if fd in self.readers: - eventmask |= select.EPOLLIN - if fd in self.writers: - eventmask |= select.EPOLLOUT - if eventmask: - try: - self._epoll.register(fd, eventmask) - except IOError: - self._epoll.modify(fd, eventmask) - else: - self._epoll.unregister(fd) - - def register_reader(self, fd, handler): - super().register_reader(fd, handler) - self._update(fd) - - def register_writer(self, fd, handler): - super().register_writer(fd, handler) - self._update(fd) - - def unregister_reader(self, fd): - super().unregister_reader(fd) - self._update(fd) - - def unregister_writer(self, fd): - super().unregister_writer(fd) - self._update(fd) - - def poll(self, timeout=None): - if timeout is None: - timeout = -1 # epoll.poll() uses -1 to mean "wait forever". - events = [] - for fd, eventmask in self._epoll.poll(timeout): - if eventmask & ~select.EPOLLOUT: - if fd in self.readers: - events.append(self.readers[fd]) - if eventmask & ~select.EPOLLIN: - if fd in self.writers: - events.append(self.writers[fd]) - return events - - -class KqueuePollster(PollsterBase): - """Pollster implementation using kqueue.""" - - def __init__(self): - super().__init__() - self._kqueue = select.kqueue() - - def register_reader(self, fd, handler): - if fd not in self.readers: - kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - return super().register_reader(fd, handler) - - def register_writer(self, fd, handler): - if fd not in self.writers: - kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - return super().register_writer(fd, handler) - - def unregister_reader(self, fd): - super().unregister_reader(fd) - kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) - self._kqueue.control([kev], 0, 0) - - def unregister_writer(self, fd): - super().unregister_writer(fd) - kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) - self._kqueue.control([kev], 0, 0) - - def poll(self, timeout=None): - events = [] - max_ev = len(self.readers) + len(self.writers) - for kev in self._kqueue.control(None, max_ev, timeout): - fd = kev.ident - flag = kev.filter - if flag == select.KQ_FILTER_READ and fd in self.readers: - events.append(self.readers[fd]) - elif flag == select.KQ_FILTER_WRITE and fd in self.writers: - events.append(self.writers[fd]) - return events - - -# Pick the best pollster class for the platform. -if hasattr(select, 'kqueue'): - best_pollster = KqueuePollster -elif hasattr(select, 'epoll'): - best_pollster = EPollPollster -elif hasattr(select, 'poll'): - best_pollster = PollPollster -else: - best_pollster = SelectPollster - - class _StopError(BaseException): """Raised to stop the event loop.""" @@ -433,12 +70,13 @@ See events.EventLoop for API specification. """ - def __init__(self, pollster=None): + def __init__(self, selector=None): super().__init__() - if pollster is None: - logging.info('Using pollster: %s', best_pollster.__name__) - pollster = best_pollster() - self._pollster = pollster + if selector is None: + # pick the best selector class for the platform + selector = selectors.Selector() + logging.info('Using selector: %s', selector.__name__) + self._selector = selector self._ready = collections.deque() self._scheduled = [] self._everytime = [] @@ -465,7 +103,9 @@ TODO: Give this a timeout too? """ - while self._ready or self._scheduled or self._pollster.pollable() > 1: + while (self._ready or + self._scheduled or + self._selector.registered_count() > 1): try: self._run_once() except _StopError: @@ -702,36 +342,83 @@ def add_reader(self, fd, callback, *args): """Add a reader callback. Return a Handler instance.""" handler = events.Handler(None, callback, args) - self._pollster.register_reader(fd, handler) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + self._selector.register(fd, selectors.SELECT_IN, + (handler, None, None)) + else: + self._selector.modify(fd, mask | selectors.SELECT_IN, + (handler, writer, connector)) + return handler def remove_reader(self, fd): """Remove a reader callback.""" - if fd in self._pollster.readers: - self._pollster.unregister_reader(fd) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + pass + else: + mask &= ~selectors.SELECT_IN + if not mask: + self._selector.unregister(fd) + else: + self._selector.modify(fd, mask, (None, writer, connector)) def add_writer(self, fd, callback, *args): """Add a writer callback. Return a Handler instance.""" handler = events.Handler(None, callback, args) - self._pollster.register_writer(fd, handler) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + self._selector.register(fd, selectors.SELECT_OUT, + (None, handler, None)) + else: + self._selector.modify(fd, mask | selectors.SELECT_OUT, + (reader, handler, connector)) return handler def remove_writer(self, fd): """Remove a writer callback.""" - if fd in self._pollster.writers: - self._pollster.unregister_writer(fd) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + pass + else: + mask &= ~selectors.SELECT_OUT + if not mask: + self._selector.unregister(fd) + else: + self._selector.modify(fd, mask, (reader, None, connector)) def add_connector(self, fd, callback, *args): """Add a connector callback. Return a Handler instance.""" - dcall = events.Handler(None, callback, args) - self._pollster.register_connector(fd, dcall) - return dcall + # XXX As long as SELECT_CONNECT == SELECT_OUT, set the handler + # as both writer and connector. + handler = events.Handler(None, callback, args) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + self._selector.register(fd, selectors.SELECT_CONNECT, + (None, handler, handler)) + else: + self._selector.modify(fd, mask | selectors.SELECT_CONNECT, + (reader, handler, handler)) + return handler def remove_connector(self, fd): """Remove a connector callback.""" - # Every connector fd is in self._pollsters.writers. - if fd in self._pollster.writers: - self._pollster.unregister_connector(fd) + try: + mask, (reader, writer, connector) = self._selector.get_info(fd) + except KeyError: + pass + else: + mask &= ~selectors.SELECT_CONNECT + if not mask: + self._selector.unregister(fd) + else: + self._selector.modify(fd, mask, (reader, None, None)) def sock_recv(self, sock, n): """XXX""" @@ -743,7 +430,7 @@ fd = sock.fileno() if registered: # Remove the callback early. It should be rare that the - # pollster says the fd is ready but the call still returns + # selector says the fd is ready but the call still returns # EAGAIN, and I am willing to take a hit in that case in # order to simplify the common case. self.remove_reader(fd) @@ -876,10 +563,10 @@ while self._scheduled and self._scheduled[0].cancelled: heapq.heappop(self._scheduled) - # Inspect the poll queue. If there's exactly one pollable + # Inspect the poll queue. If there's exactly one selectable # file descriptor, it's the self-pipe, and if there's nothing # scheduled, we should ignore it. - if self._pollster.pollable() > 1 or self._scheduled: + if self._selector.registered_count() > 1 or self._scheduled: if self._ready: timeout = 0 elif self._scheduled: @@ -892,7 +579,7 @@ timeout = min(timeout, deadline) t0 = time.monotonic() - events = self._pollster.poll(timeout) + event_list = self._selector.select(timeout) t1 = time.monotonic() argstr = '' if timeout is None else ' %.3f' % timeout if t1-t0 >= 1: @@ -900,8 +587,13 @@ else: level = logging.DEBUG logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0) - for handler in events: - self._add_callback(handler) + for fileobj, mask, (reader, writer, connector) in event_list: + if mask & selectors.SELECT_IN and reader is not None: + self._add_callback(reader) + if mask & selectors.SELECT_OUT and writer is not None: + self._add_callback(writer) + elif mask & selectors.SELECT_CONNECT and connector is not None: + self._add_callback(connector) # Handle 'later' callbacks that are ready. now = time.monotonic()