import gc import os import random import sys import threading import time from multiprocessing import Queue from multiprocessing import util random.seed(42) def cb(*args): pass class Finalizable(object): def __init__(self): self.ref = self util.Finalize(self, cb, exitpriority=random.randint(1, 100)) def run_finalizers(): while True: time.sleep(random.random() * 1e-1) try: util._run_finalizers() except Exception: print() print("-- Exception in run_finalizers --") sys.excepthook(*sys.exc_info()) os._exit(1) def make_finalizers(): d = {} threshold = 60 while True: try: d[random.getrandbits(32)] = {Finalizable() for i in range(10)} if len(d) > threshold: for i in range(len(d) // 2): d.popitem() except Exception: print() print("-- Exception in make_finalizers --") sys.excepthook(*sys.exc_info()) os._exit(1) def main(): sys.setswitchinterval(1e-6) gc.set_threshold(5, 5, 5) t = threading.Thread(target=run_finalizers) t.daemon = True t.start() t = threading.Thread(target=make_finalizers) t.daemon = True t.start() time.sleep(1e8) if __name__ == "__main__": main()