import asyncio from asyncio import tasks from asyncio import unix_events import functools import os import signal import sys import time TIMEOUT = 5 class Watcher(unix_events.AbstractChildWatcher): def __init__(self): self._callbacks = {} self._saved_sighandler = None self.transport = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): return def remove_child_handler(self, pid): return def is_active(self): return self._saved_sighandler is not None def close(self): raise RuntimeError('not needed for example') def attach_loop(self, loop): self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) # Set SA_RESTART to limit EINTR occurrences. # signal.siginterrupt(signal.SIGCHLD, False) def _sig_chld(self, signum, frame): print('_sig_child: started') def process_exited(): self.transport.proc_future.set_result('okay') loop.call_soon_threadsafe(process_exited) def _finish_waiter(waiter, msg, *args): if waiter.done(): print(f'_finish_waiter: already done') else: print(f'_finish_waiter: setting result={msg}') waiter.set_result(msg) async def run(loop, watcher): args = [sys.executable, '-c', 'import time; time.sleep(3600)'] create = loop.subprocess_exec(asyncio.SubprocessProtocol, *args) transport, protocol = await create watcher.transport = transport pid = transport.get_pid() print(f'killing pid: {pid!r}') transport.close() proc_future = loop.create_future() transport.proc_future = proc_future waiter = loop.create_future() timeout_handle = loop.call_later(TIMEOUT, _finish_waiter, waiter, '**TIMEOUT**') cb = functools.partial(_finish_waiter, waiter, 'okay') fut = tasks.ensure_future(proc_future, loop=loop) fut.add_done_callback(cb) try: try: await waiter except asyncio.CancelledError: print('cancelled') fut.remove_done_callback(cb) fut.cancel() return result = waiter.result() if result != 'okay': print(f'not okay: {result}') exit() print('okay') finally: timeout_handle.cancel() async def main(loop, watcher): i = 1 start = time.time() while True: elapsed = time.time() - start print(f'[{i}]: {elapsed:.2f}') await run(loop, watcher) time.sleep(0.2) i += 1 policy = asyncio.get_event_loop_policy() watcher = Watcher() loop = policy.new_event_loop() watcher.attach_loop(loop) policy.set_child_watcher(watcher) loop.run_until_complete(main(loop, watcher))