diff -r 8ec13c088465 Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py Thu Feb 05 11:46:45 2015 +0100 +++ b/Lib/asyncio/base_events.py Thu Feb 05 12:02:08 2015 +0100 @@ -189,6 +189,7 @@ class BaseEventLoop(events.AbstractEvent # Identifier of the thread running the event loop, or None if the # event loop is not running self._thread_id = None + self._pid = os.getpid() self._clock_resolution = time.get_clock_info('monotonic').resolution self._exception_handler = None self._debug = (not sys.flags.ignore_environment @@ -203,6 +204,15 @@ class BaseEventLoop(events.AbstractEvent % (self.__class__.__name__, self.is_running(), self.is_closed(), self.get_debug())) + def _detect_fork(self): + pid = os.getpid() + if pid != self._pid: + self._pid = pid + self._at_fork() + + def _at_fork(self): + pass + def create_task(self, coro): """Schedule a coroutine object. @@ -1112,6 +1122,9 @@ class BaseEventLoop(events.AbstractEvent when = self._scheduled[0]._when timeout = max(0, when - self.time()) + # detect fork before using the selector + self._detect_fork() + if self._debug and timeout != 0: t0 = self.time() event_list = self._selector.select(timeout) diff -r 8ec13c088465 Lib/asyncio/proactor_events.py --- a/Lib/asyncio/proactor_events.py Thu Feb 05 11:46:45 2015 +0100 +++ b/Lib/asyncio/proactor_events.py Thu Feb 05 12:02:08 2015 +0100 @@ -489,6 +489,10 @@ class BaseProactorEventLoop(base_events. f.add_done_callback(self._loop_self_reading) def _write_to_self(self): + # The self-pipe must not be shared between two processes: at fork, + # create a new pipe to unshare it + self._detect_fork() + self._csock.send(b'\0') def _start_serving(self, protocol_factory, sock, diff -r 8ec13c088465 Lib/asyncio/selector_events.py --- a/Lib/asyncio/selector_events.py Thu Feb 05 11:46:45 2015 +0100 +++ b/Lib/asyncio/selector_events.py Thu Feb 05 12:02:08 2015 +0100 @@ -94,6 +94,8 @@ class BaseSelectorEventLoop(base_events. raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return + # detect fork before using the self-pipe or the selector + self._detect_fork() self._close_self_pipe() super().close() if self._selector is not None: @@ -135,6 +137,9 @@ class BaseSelectorEventLoop(base_events. break def _write_to_self(self): + # detect fork before using the self-pipe + self._detect_fork() + # This may be called from a different thread, possibly after # _close_self_pipe() has been called or even while it is # running. Guard for self._csock being None or closed. When @@ -230,6 +235,8 @@ class BaseSelectorEventLoop(base_events. """Add a reader callback.""" self._check_closed() handle = events.Handle(callback, args, self) + # detect fork before using the selector + self._detect_fork() try: key = self._selector.get_key(fd) except KeyError: @@ -246,6 +253,8 @@ class BaseSelectorEventLoop(base_events. """Remove a reader callback.""" if self.is_closed(): return False + # detect fork before using the selector + self._detect_fork() try: key = self._selector.get_key(fd) except KeyError: @@ -268,6 +277,8 @@ class BaseSelectorEventLoop(base_events. """Add a writer callback..""" self._check_closed() handle = events.Handle(callback, args, self) + # detect fork before using the selector + self._detect_fork() try: key = self._selector.get_key(fd) except KeyError: @@ -284,6 +295,8 @@ class BaseSelectorEventLoop(base_events. """Remove a writer callback.""" if self.is_closed(): return False + # detect fork before using the selector + self._detect_fork() try: key = self._selector.get_key(fd) except KeyError: diff -r 8ec13c088465 Lib/asyncio/unix_events.py --- a/Lib/asyncio/unix_events.py Thu Feb 05 11:46:45 2015 +0100 +++ b/Lib/asyncio/unix_events.py Thu Feb 05 12:02:08 2015 +0100 @@ -48,6 +48,12 @@ class _UnixSelectorEventLoop(selector_ev super().__init__(selector) self._signal_handlers = {} + def _at_fork(self): + super()._at_fork() + self._selector._at_fork() + self._close_self_pipe() + self._make_self_pipe() + def _socketpair(self): return socket.socketpair() diff -r 8ec13c088465 Lib/selectors.py --- a/Lib/selectors.py Thu Feb 05 11:46:45 2015 +0100 +++ b/Lib/selectors.py Thu Feb 05 12:02:08 2015 +0100 @@ -192,6 +192,9 @@ class BaseSelector(metaclass=ABCMeta): def __exit__(self, *args): self.close() + def _at_fork(self): + pass + class _BaseSelectorImpl(BaseSelector): """Base selector implementation.""" @@ -221,6 +224,9 @@ class _BaseSelectorImpl(BaseSelector): # Raise ValueError after all. raise + def _register(self, key): + pass + def register(self, fileobj, events, data=None): if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): raise ValueError("Invalid events: {!r}".format(events)) @@ -231,14 +237,19 @@ class _BaseSelectorImpl(BaseSelector): raise KeyError("{!r} (FD {}) is already registered" .format(fileobj, key.fd)) + self._register(key) self._fd_to_key[key.fd] = key return key + def _unregister(self, key): + pass + def unregister(self, fileobj): try: key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) except KeyError: raise KeyError("{!r} is not registered".format(fileobj)) from None + self._unregister(key) return key def modify(self, fileobj, events, data=None): @@ -286,19 +297,15 @@ class SelectSelector(_BaseSelectorImpl): self._readers = set() self._writers = set() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - if events & EVENT_READ: + def _register(self, key): + if key.events & EVENT_READ: self._readers.add(key.fd) - if events & EVENT_WRITE: + if key.events & EVENT_WRITE: self._writers.add(key.fd) - return key - def unregister(self, fileobj): - key = super().unregister(fileobj) + def _unregister(self, key): self._readers.discard(key.fd) self._writers.discard(key.fd) - return key if sys.platform == 'win32': def _select(self, r, w, _, timeout=None): @@ -338,20 +345,16 @@ if hasattr(select, 'poll'): super().__init__() self._poll = select.poll() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) + def _register(self, key): poll_events = 0 - if events & EVENT_READ: + if key.events & EVENT_READ: poll_events |= select.POLLIN - if events & EVENT_WRITE: + if key.events & EVENT_WRITE: poll_events |= select.POLLOUT self._poll.register(key.fd, poll_events) - return key - def unregister(self, fileobj): - key = super().unregister(fileobj) + def _unregister(self, key): self._poll.unregister(key.fd) - return key def select(self, timeout=None): if timeout is None: @@ -392,25 +395,28 @@ if hasattr(select, 'epoll'): def fileno(self): return self._epoll.fileno() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) + def _register(self, key): epoll_events = 0 - if events & EVENT_READ: + if key.events & EVENT_READ: epoll_events |= select.EPOLLIN - if events & EVENT_WRITE: + if key.events & EVENT_WRITE: epoll_events |= select.EPOLLOUT self._epoll.register(key.fd, epoll_events) - return key - def unregister(self, fileobj): - key = super().unregister(fileobj) + def _unregister(self, key): try: self._epoll.unregister(key.fd) except OSError: # This can happen if the FD was closed since it # was registered. pass - return key + + def _at_fork(self): + # don't unregister file descriptors: epoll is still shared with + # the parent process + self._epoll = select.epoll() + for key in self._fd_to_key.values(): + self._register(key) def select(self, timeout=None): if timeout is None: @@ -461,20 +467,23 @@ if hasattr(select, 'devpoll'): def fileno(self): return self._devpoll.fileno() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) + def _register(self, key): poll_events = 0 - if events & EVENT_READ: + if key.events & EVENT_READ: poll_events |= select.POLLIN - if events & EVENT_WRITE: + if key.events & EVENT_WRITE: poll_events |= select.POLLOUT self._devpoll.register(key.fd, poll_events) - return key - def unregister(self, fileobj): - key = super().unregister(fileobj) + def _unregister(self, key): self._devpoll.unregister(key.fd) - return key + + def _at_fork(self): + # don't unregister file descriptors: devpoll is still shared with + # the parent process + self._devpoll = select.devpoll() + for key in self._fd_to_key.values(): + self._register(key) def select(self, timeout=None): if timeout is None: @@ -519,20 +528,17 @@ if hasattr(select, 'kqueue'): def fileno(self): return self._kqueue.fileno() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - if events & EVENT_READ: + def _register(self, key): + if key.events & EVENT_READ: kev = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) self._kqueue.control([kev], 0, 0) - if events & EVENT_WRITE: + if key.events & EVENT_WRITE: kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) self._kqueue.control([kev], 0, 0) - return key - def unregister(self, fileobj): - key = super().unregister(fileobj) + def _unregister(self, key): if key.events & EVENT_READ: kev = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) @@ -550,7 +556,13 @@ if hasattr(select, 'kqueue'): except OSError: # See comment above. pass - return key + + def _at_fork(self): + # don't unregister file descriptors: kqueue is still shared with + # the parent process + self._kqueue = select.kqueue() + for key in self._fd_to_key.values(): + self._register(key) def select(self, timeout=None): timeout = None if timeout is None else max(timeout, 0)