import gc import os import threading import time import psutil FORCE_GC = False class Memwatcher: ACTIVE = True MEM = 0 def __init__(self): self._observer = threading.Thread(target=self.loop) self.active = True self._observer.start() def stop(self): self.active = False self._observer.join() def loop(self): while self.active: print(f"Ram used: {self.mem():.2f} MB - A: Active({A.COUNT}) / Total({A.TOTAL}) - B: Active({B.COUNT}) / Total({B.TOTAL})") time.sleep(5) def mem(self): if FORCE_GC: gc.collect() process = psutil.Process(os.getpid()) return process.memory_info().rss / 1_000_000 # in bytes class A: COUNT = 0 TOTAL = 0 _BLOB = "0" * 1_000_000 # 1MB string def __init__(self): A.COUNT += 1 # Current object count A.TOTAL += 1 # Total instantiations self.s = (A._BLOB + '.') # Create new copy of string to eat up 1MB of ram - the "." forces a new string self._b = B(self) # Create a circular reference def __del__(self): # When garbage collected, decrement counter A.COUNT -= 1 class B: COUNT = 0 TOTAL = 0 def __init__(self, a:A): B.COUNT += 1 # Current object count B.TOTAL += 1 # Total instantiations self._a = a # Circular reference var def __del__(self): # When garbage collected, decrement counter B.COUNT -= 1 if __name__ == '__main__': m = Memwatcher() for i in range(1_000_000): a = A() m.stop()