diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -411,6 +411,20 @@ The :mod:`multiprocessing` package mostl See :ref:`multiprocessing-auth-keys`. + .. attribute:: sentinel + + A numeric handle of a system object which will become "ready" when + the process ends. + + On Windows, this is an OS handle usable with the ``WaitForSingleObject`` + and ``WaitForMultipleObjects`` family of API calls. On Unix, this is + a file descriptor usable with primitives from the :mod:`select` module. + + You can use this value if you want to wait on several events at once. + Otherwise calling :meth:`join()` is simpler. + + .. versionadded:: 3.3 + .. method:: terminate() Terminate the process. On Unix this is done using the ``SIGTERM`` signal; diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -101,6 +101,7 @@ else: if sys.platform != 'win32': import time + import select exit = os._exit duplicate = os.dup @@ -118,8 +119,12 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +133,11 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -145,20 +155,14 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = select.select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: @@ -258,6 +262,7 @@ else: self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -132,6 +132,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -218,6 +219,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -71,6 +71,22 @@ HAVE_GETVALUE = not getattr(_multiproces 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") +if WIN32: + from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0 + + def wait_for_handle(handle, timeout): + if timeout is None or timeout < 0.0: + timeout = INFINITE + else: + timeout = int(1000 * timeout) + return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0 +else: + from select import select + + def wait_for_handle(handle, timeout): + if timeout is not None and timeout < 0.0: + timeout = None + return handle in select([handle], [], [], timeout)[0] # # Some tests require ctypes @@ -307,6 +323,26 @@ class _TestProcess(BaseTestCase): ] self.assertEqual(result, expected) + @classmethod + def _test_sentinel(cls, event): + event.wait(10.0) + + def test_sentinel(self): + if self.TYPE == "threads": + return + event = self.Event() + p = self.Process(target=self._test_sentinel, args=(event,)) + with self.assertRaises(ValueError): + p.sentinel + p.start() + self.addCleanup(p.join) + sentinel = p.sentinel + self.assertIsInstance(sentinel, int) + self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) + event.set() + p.join() + self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) + # # #