import asyncio from asyncio import tasks from asyncio import unix_events import functools import os import signal import sys import time TIMEOUT = 5 class Watcher(unix_events.AbstractChildWatcher): def __init__(self): self._callbacks = {} self._saved_sighandler = None self.transport = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): return def remove_child_handler(self, pid): return def is_active(self): return self._saved_sighandler is not None def close(self): self._callbacks.clear() if self._saved_sighandler is not None: handler = signal.getsignal(signal.SIGCHLD) if handler != self._sig_chld: logger.warning("SIGCHLD handler was changed by outside code") else: signal.signal(signal.SIGCHLD, self._saved_sighandler) self._saved_sighandler = None def attach_loop(self, loop): self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) # Set SA_RESTART to limit EINTR occurrences. # signal.siginterrupt(signal.SIGCHLD, False) def _sig_chld(self, signum, frame): print('_sig_child: started') transport = self.transport def process_exited(): transport.my_waiter.set_result('okay') loop.call_soon_threadsafe(process_exited) def _release_waiter(waiter, msg, *args): print(f'releasing waiter: {msg}') if not waiter.done(): waiter.set_result(msg) async def run(loop, watcher): args = [sys.executable, '-c', 'import time; time.sleep(3600)'] create = loop.subprocess_exec(asyncio.SubprocessProtocol, *args) transport, protocol = await create watcher.transport = transport pid = transport.get_pid() print(f'killing pid: {pid!r}') transport.close() waiter = loop.create_future() async def transport_wait(): if transport._returncode is not None: return waiter = loop.create_future() transport.my_waiter = waiter try: print('BaseSubprocessTransport: awaiting in _wait') await waiter except BaseException as exc: print(f'exception while waiting: {exc!r}') raise timeout_handle = loop.call_later(TIMEOUT, _release_waiter, waiter, '**TIMEOUT**') coro = transport_wait() fut = tasks.ensure_future(coro, loop=loop) cb = functools.partial(_release_waiter, waiter, 'okay') fut.add_done_callback(cb) try: try: await waiter except asyncio.CancelledError: print('cancelled') fut.remove_done_callback(cb) fut.cancel() return result = waiter.result() if result != 'okay': print(f'not okay: {result}') exit() print('okay') finally: timeout_handle.cancel() async def main(loop, watcher): i = 1 start = time.time() while True: elapsed = time.time() - start print(f'[{i}]: {elapsed:.2f}') await run(loop, watcher) time.sleep(0.2) i += 1 policy = asyncio.get_event_loop_policy() watcher = Watcher() loop = policy.new_event_loop() watcher.attach_loop(loop) policy.set_child_watcher(watcher) loop.run_until_complete(main(loop, watcher))