import _overlapped import _winapi import os import faulthandler import subprocess import sys import threading import time faulthandler.enable() hExit = _overlapped.CreateEvent(None, True, False, None) class SpawnThread(threading.Thread): argv = [sys.executable, '-c', 'while True: pass'] startTime = 0 def run(self): while _winapi.WaitForSingleObject(hExit, 0) == _winapi.WAIT_TIMEOUT: self.startTime = time.time() try: subprocess.run(self.argv, timeout=0.0001) except subprocess.TimeoutExpired: pass threads = [ SpawnThread() for _ in range(os.cpu_count()) ] for t in threads: t.start() stuck = False while True: try: print(f'[{time.asctime()}] running...') time.sleep(1) except KeyboardInterrupt: _overlapped.SetEvent(hExit) break current = time.time() for t in threads: if current - t.startTime > 1: print(f'!!! {t.name}') stuck = True if stuck: _overlapped.SetEvent(hExit) break for t in threads: t.join(0.01) if stuck: faulthandler.dump_traceback() input('Inspect processes!')