import unittest from multiprocessing import Process import time import os import fcntl def count_leaks(process_count): def p_start(i): print(i) procs = [] for i in range(process_count): p = Process(target=p_start, args=(str(i),)) p.start() procs.append(p) for p in procs: p.join() leaks = 0 for p in procs: try: fcntl.fcntl(p.sentinel, fcntl.F_GETFD) except: pass else: os.close(p.sentinel) leaks += 1 return leaks class MpPipeLeakTest(unittest.TestCase): def test(self): self.assertEqual(count_leaks(10), 0) if __name__ == '__main__': unittest.main()