diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -963,31 +963,75 @@ self.assertEqual(contents, f.read()) self._box = self._factory(self._path) + class fork_with_ipc: + + # Right now it looks like this is the only place in the stdlib + # test suite where this is useful, but if other places are found + # it can easily be pulled out into test.support. + + def __init__(self): + self.p_sock_close = self.c_sock_close = False + self.c_sock_shutdown = False + self.ipc_close = False + + def __enter__(self): + self.p_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.p_sock.setblocking(1) + port = support.bind_port(self.p_sock, '127.0.0.1') + self.p_sock_close = True + self.pid = os.fork() + if self.pid == 0: + self.child = True + self.c_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.c_sock_close = True + self.c_sock.setblocking(1) + self.c_sock.connect(('127.0.0.1', port)) + self.c_sock_shutdown = True + self.ipc = self.c_sock.makefile('rw') + self.ipc_close = True + else: + self.child = False + self.p_sock.listen(1) + (self.c_sock, _) = self.p_sock.accept() + self.c_sock_close = self.c_sock_shutdown = True + self.ipc = self.c_sock.makefile('rw') + self.ipc_close = True + return self + + def __exit__(self, *args): + if self.p_sock_close: + self.p_sock.close() + if self.ipc_close: + self.ipc.close() + if self.c_sock_shutdown: + self.c_sock.shutdown(socket.SHUT_RDWR) + if self.c_sock_close: + self.c_sock.close() + if self.child: + os._exit(0) + else: + os.waitpid(self.pid, 0) + + def signal(self, text): + self.ipc.write(text+'\n') + self.ipc.flush() + + def check(self): + return self.ipc.readline().rstrip() + + @unittest.skipUnless(hasattr(os, 'fork'), "can't fork to create lock conflict") def test_lock_conflict(self): - # Fork off a subprocess that will lock the file for 2 seconds, - # unlock it, and then exit. - if not hasattr(os, 'fork'): - return - pid = os.fork() - if pid == 0: - # In the child, lock the mailbox. - try: + with self.fork_with_ipc() as ipc: + if ipc.child: self._box.lock() - time.sleep(2) + ipc.signal('go') + ipc.check() self._box.unlock() - finally: - os._exit(0) - - # In the parent, sleep a bit to give the child time to acquire - # the lock. - time.sleep(0.5) - try: - self.assertRaises(mailbox.ExternalClashError, - self._box.lock) - finally: - # Wait for child to exit. Locking should now succeed. - exited_pid, status = os.waitpid(pid, 0) - + else: + ipc.check() + self.assertRaises(mailbox.ExternalClashError, self._box.lock) + ipc.signal('done') + # Locking should now succeed. self._box.lock() self._box.unlock()