import os import re import subprocess import asyncio.subprocess import asyncio from contextlib import suppress def assert_no_childprocesses(): processes = subprocess.check_output(["ps", "-eo", "pid,ppid,comm"], encoding="utf-8") for line in processes.splitlines()[1:]: match = re.match(r"(\d+)\s+(\d+)\s+(.*)", line.strip()) pid, ppid, cmd = match.groups() if cmd == "ps": continue assert int(ppid) != os.getpid(), "Some process still has this process as parent: " + line def test1(): async def run_subprocess(): p = await asyncio.subprocess.create_subprocess_exec("sleep", "10") await p.wait() async def main(): with suppress(asyncio.TimeoutError): await asyncio.wait_for(run_subprocess(), 1) # assert_no_childprocesses() # FAIL(1) await asyncio.sleep(1) # assert_no_childprocesses() # FAIL(2) asyncio.get_event_loop().run_until_complete(main()) def test2(): async def run_subprocess_and_kill_on_timeout(): try: p = await asyncio.subprocess.create_subprocess_exec("sleep", "10") await p.wait() except asyncio.CancelledError: p.kill() await p.wait() async def main(): with suppress(asyncio.TimeoutError): await asyncio.wait_for(run_subprocess_and_kill_on_timeout(), 1) # assert_no_childprocesses() # FAIL(3) await asyncio.sleep(1) # How long to sleep??? assert_no_childprocesses() # PASS asyncio.get_event_loop().run_until_complete(main()) test1() test2()