import time import os import thread import unittest import sys class TestForkInThread(unittest.TestCase): def setUp(self): self.read_fd, self.write_fd = os.pipe() def test_forkinthread(self): def thread1(): try: pid = os.fork() # fork in a thread except RuntimeError: sys.exit(0) # exit the child if pid == 0: # child os.close(self.read_fd) os.write(self.write_fd, "OK") sys.exit(0) else: # parent os.close(self.write_fd) thread.start_new_thread(thread1, ()) self.assertEqual(os.read(self.read_fd, 2), "OK", "Unable to fork() in thread") def tearDown(self): try: os.close(self.read_fd) except OSError: pass try: os.close(self.write_fd) except OSError: pass if __name__ == "__main__": unittest.main()