diff -r 25028b0e1183 Lib/asyncio/base_subprocess.py --- a/Lib/asyncio/base_subprocess.py Thu Jan 29 02:57:10 2015 +0100 +++ b/Lib/asyncio/base_subprocess.py Thu Jan 29 12:52:39 2015 +0100 @@ -1,6 +1,7 @@ import collections import subprocess +from . import futures from . import protocols from . import transports from .coroutines import coroutine @@ -11,11 +12,13 @@ class BaseSubprocessTransport(transports def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + waiter=None, extra=None, **kwargs): super().__init__(extra) self._protocol = protocol self._loop = loop + self._proc = None self._pid = None + self._exit_waiters = [] self._pipes = {} if stdin == subprocess.PIPE: @@ -27,6 +30,7 @@ class BaseSubprocessTransport(transports self._pending_calls = collections.deque() self._finished = False self._returncode = None + # Create the child process: set the _proc attribute self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._pid = self._proc.pid @@ -39,6 +43,8 @@ class BaseSubprocessTransport(transports logger.debug('process %r created: pid %s', program, self._pid) + self._loop.create_task(self._post_init(waiter)) + def __repr__(self): info = [self.__class__.__name__, 'pid=%s' % self._pid] if self._returncode is not None: @@ -70,12 +76,23 @@ class BaseSubprocessTransport(transports raise NotImplementedError def close(self): + proc = self._proc + if proc is None: + return + for proto in self._pipes.values(): if proto is None: continue proto.pipe.close() + if self._returncode is None: - self.terminate() + if self._loop.get_debug(): + logger.warning('Close running child process: kill %r', self) + + try: + proc.kill() + except ProcessLookupError: + pass def get_pid(self): return self._pid @@ -98,36 +115,7 @@ class BaseSubprocessTransport(transports def kill(self): self._proc.kill() - def _kill_wait(self): - """Close pipes, kill the subprocess and read its return status. - - Function called when an exception is raised during the creation - of a subprocess. - """ - if self._loop.get_debug(): - logger.warning('Exception during subprocess creation, ' - 'kill the subprocess %r', - self, - exc_info=True) - - proc = self._proc - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - if proc.stdin: - proc.stdin.close() - - try: - proc.kill() - except ProcessLookupError: - pass - self._returncode = proc.wait() - - self.close() - - @coroutine - def _post_init(self): + def _post_init(self, waiter): try: proc = self._proc loop = self._loop @@ -153,9 +141,13 @@ class BaseSubprocessTransport(transports for callback, data in self._pending_calls: self._loop.call_soon(callback, *data) self._pending_calls = None - except: - self._kill_wait() - raise + except Exception as exc: + self.close() + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def _call(self, cb, *data): if self._pending_calls is not None: @@ -180,6 +172,24 @@ class BaseSubprocessTransport(transports self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + + def wait(self): + """Wait until the process exit and return the process return code. + + This method is a coroutine.""" + if self._returncode is not None: + return self._returncode + + waiter = futures.Future(loop=self._loop) + self._exit_waiters.append(waiter) + yield from waiter + return waiter.result() + def _try_finish(self): assert not self._finished if self._returncode is None: diff -r 25028b0e1183 Lib/asyncio/subprocess.py --- a/Lib/asyncio/subprocess.py Thu Jan 29 02:57:10 2015 +0100 +++ b/Lib/asyncio/subprocess.py Thu Jan 29 12:52:39 2015 +0100 @@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.F super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None - self.waiter = futures.Future(loop=loop) - self._waiters = collections.deque() self._transport = None def __repr__(self): @@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.F reader=None, loop=self._loop) - if not self.waiter.cancelled(): - self.waiter.set_result(None) - def pipe_data_received(self, fd, data): if fd == 1: reader = self.stdout @@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.F reader.set_exception(exc) def process_exited(self): - returncode = self._transport.get_returncode() self._transport.close() self._transport = None - # wake up futures waiting for wait() - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.cancelled(): - waiter.set_result(returncode) - class Process: def __init__(self, transport, protocol, loop): @@ -122,17 +110,11 @@ class Process: def returncode(self): return self._transport.get_returncode() - @coroutine def wait(self): - """Wait until the process exit and return the process return code.""" - returncode = self._transport.get_returncode() - if returncode is not None: - return returncode + """Wait until the process exit and return the process return code. - waiter = futures.Future(loop=self._loop) - self._protocol._waiters.append(waiter) - yield from waiter - return waiter.result() + This method is a coroutine.""" + return self._transport.wait() def _check_alive(self): if self._transport.get_returncode() is not None: @@ -221,11 +203,6 @@ def create_subprocess_shell(cmd, stdin=N protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) @coroutine @@ -241,9 +218,4 @@ def create_subprocess_exec(program, *arg program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) diff -r 25028b0e1183 Lib/asyncio/unix_events.py --- a/Lib/asyncio/unix_events.py Thu Jan 29 02:57:10 2015 +0100 +++ b/Lib/asyncio/unix_events.py Thu Jan 29 12:52:39 2015 +0100 @@ -15,6 +15,7 @@ from . import base_subprocess from . import constants from . import coroutines from . import events +from . import futures from . import selector_events from . import selectors from . import transports @@ -174,16 +175,20 @@ class _UnixSelectorEventLoop(selector_ev stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + waiter = futures.Future(loop=self) transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) + + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) return transp @@ -750,7 +755,7 @@ class SafeChildWatcher(BaseChildWatcher) pass def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = callback, args + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) diff -r 25028b0e1183 Lib/asyncio/windows_events.py --- a/Lib/asyncio/windows_events.py Thu Jan 29 02:57:10 2015 +0100 +++ b/Lib/asyncio/windows_events.py Thu Jan 29 12:52:39 2015 +0100 @@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events. def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = futures.Future(loop=self) transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise return transp diff -r 25028b0e1183 Lib/test/test_asyncio/test_events.py --- a/Lib/test/test_asyncio/test_events.py Thu Jan 29 02:57:10 2015 +0100 +++ b/Lib/test/test_asyncio/test_events.py Thu Jan 29 12:52:39 2015 +0100 @@ -1540,9 +1540,10 @@ class SubprocessTestsMixin: stdin = transp.get_pipe_transport(0) stdin.write(b'Python The Winner') self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) self.assertEqual(b'Python The Winner', proto.data[1]) def test_subprocess_interactive(self): @@ -1556,21 +1557,20 @@ class SubprocessTestsMixin: self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) - - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: + stdin = transp.get_pipe_transport(0) + stdin.write(b'Python ') + self.loop.run_until_complete(proto.got_data[1].wait()) + proto.got_data[1].clear() + self.assertEqual(b'Python ', proto.data[1]) + + stdin.write(b'The Winner') + self.loop.run_until_complete(proto.got_data[1].wait()) + self.assertEqual(b'Python The Winner', proto.data[1]) + + with test_utils.disable_logger(): transp.close() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_shell(self): connect = self.loop.subprocess_shell( @@ -1728,9 +1728,10 @@ class SubprocessTestsMixin: # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using # WriteFile() we get ERROR_BROKEN_PIPE as expected.) self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_wait_no_same_group(self): # start the new process in a new session