diff -r 03063e718f5f Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py Mon Jul 23 10:22:36 2012 -0500 +++ b/Lib/multiprocessing/connection.py Tue Jul 24 09:37:13 2012 -0400 @@ -497,6 +497,11 @@ ''' if duplex: s1, s2 = socket.socketpair() + # _multiprocessing.Connection read/write semantics do not handle + # non-blocking sockets correctly (issue 6056). This bug-fix + # retains current behavior and allows for socket default timeout + s1.setblocking(True) + s2.setblocking(True) c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: @@ -561,6 +566,8 @@ if os.name == 'posix': self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # blocking sockets fix for issue 6056 + self._socket.setblocking(True) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() @@ -579,6 +586,8 @@ def accept(self): s, self._last_accepted = self._socket.accept() + # blocking sockets fix for issue 6056 + s.setblocking(True) return Connection(s.detach()) def close(self): @@ -593,6 +602,8 @@ ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: + # blocking sockets fix for issue 6056 + s.setblocking(True) s.connect(address) return Connection(s.detach()) diff -r 03063e718f5f Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Mon Jul 23 10:22:36 2012 -0500 +++ b/Lib/test/test_multiprocessing.py Tue Jul 24 09:37:13 2012 -0400 @@ -3317,43 +3317,18 @@ TestFlags] # +# Issue 6056: test with/without non-blocking sockets # -# - -def test_main(run=None): - if sys.platform.startswith("linux"): - try: - lock = multiprocessing.RLock() - except OSError: - raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") - - check_enough_semaphores() - - if run is None: - from test.support import run_unittest as run - - util.get_temp_dir() # creates temp directory for use by all processes - - multiprocessing.get_logger().setLevel(LOG_LEVEL) - - ProcessesMixin.pool = multiprocessing.Pool(4) - ThreadsMixin.pool = multiprocessing.dummy.Pool(4) - ManagerMixin.manager.__init__() - ManagerMixin.manager.start() - ManagerMixin.pool = ManagerMixin.manager.Pool(4) - - testcases = ( - sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + - sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + - sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + - testcases_other - ) - - loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase - suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) - try: - run(suite) - finally: + +class MultiprocessingTestSuite( unittest.TestSuite ): + def setUp(self): + ProcessesMixin.pool = multiprocessing.Pool(4) + ThreadsMixin.pool = multiprocessing.dummy.Pool(4) + ManagerMixin.manager.__init__() + ManagerMixin.manager.start() + ManagerMixin.pool = ManagerMixin.manager.Pool(4) + + def tearDown(self): ThreadsMixin.pool.terminate() ProcessesMixin.pool.terminate() ManagerMixin.pool.terminate() @@ -3364,6 +3339,57 @@ ProcessesMixin.pool.join() del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool + def run( self, result, debug=False ): + self.setUp() + try: + super().run( result, debug ) + finally: + self.tearDown() + +class NonBlockingSocketSuite( MultiprocessingTestSuite ): + def setUp(self): + socket.setdefaulttimeout(30) + super().setUp() + + def tearDown(self): + super().tearDown() + socket.setdefaulttimeout(None) + +# +# +# + +def test_main(run=None): + if sys.platform.startswith("linux"): + try: + lock = multiprocessing.RLock() + except OSError: + raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") + + check_enough_semaphores() + + if run is None: + from test.support import run_unittest as run + + util.get_temp_dir() # creates temp directory for use by all processes + + multiprocessing.get_logger().setLevel(LOG_LEVEL) + + testcases = ( + sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + + sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + + sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + + testcases_other + ) + + loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase + suiteBlocking = MultiprocessingTestSuite(loadTestsFromTestCase(tc) for tc in testcases) + suiteNonBlocking = NonBlockingSocketSuite(loadTestsFromTestCase(tc) for tc in testcases) + + suite = unittest.TestSuite([suiteBlocking,suiteNonBlocking]) + + run(suite) + def main(): test_main(unittest.TextTestRunner(verbosity=2).run) diff -r 03063e718f5f Misc/ACKS --- a/Misc/ACKS Mon Jul 23 10:22:36 2012 -0500 +++ b/Misc/ACKS Tue Jul 24 09:37:13 2012 -0400 @@ -1130,6 +1130,7 @@ Sue Williams Frank Willison Greg V. Wilson +J Derek Wilson Jody Winston Collin Winter Dik Winter diff -r 03063e718f5f Misc/NEWS --- a/Misc/NEWS Mon Jul 23 10:22:36 2012 -0500 +++ b/Misc/NEWS Tue Jul 24 09:37:13 2012 -0400 @@ -52,6 +52,9 @@ Library ------- +- Issue #6056: Allow socket.setdefaulttimeout to work with _mutliprocessing + C-API socket interface that expects blocking sockets. + - Issue #15402: An issue in the struct module that caused sys.getsizeof to return incorrect results for struct.Struct instances has been fixed. Initial patch by Serhiy Storchaka.