diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -19,6 +19,7 @@ import random import logging import struct +import operator import test.support import test.script_helper @@ -1624,6 +1625,18 @@ class _TestPool(BaseTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.pool = cls.Pool(4) + + @classmethod + def tearDownClass(cls): + cls.pool.terminate() + cls.pool.join() + cls.pool = None + super().tearDownClass() + def test_apply(self): papply = self.pool.apply self.assertEqual(papply(sqr, (5,)), sqr(5)) @@ -1715,15 +1728,6 @@ p.join() def test_terminate(self): - if self.TYPE == 'manager': - # On Unix a forked process increfs each shared object to - # which its parent process held a reference. If the - # forked process gets terminated then there is likely to - # be a reference leak. So to prevent - # _TestZZZNumberOfObjects from failing we skip this test - # when using a manager. - return - result = self.pool.map_async( time.sleep, [0.1 for i in range(10000)], chunksize=1 ) @@ -1751,7 +1755,6 @@ with multiprocessing.Pool(2) as p: r = p.map_async(sqr, L) self.assertEqual(r.get(), expected) - print(p._state) self.assertRaises(ValueError, p.map_async, sqr, L) def raising(): @@ -1845,35 +1848,6 @@ for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) - -# -# Test that manager has expected number of shared objects left -# - -class _TestZZZNumberOfObjects(BaseTestCase): - # Because test cases are sorted alphabetically, this one will get - # run after all the other tests for the manager. It tests that - # there have been no "reference leaks" for the manager's shared - # objects. Note the comment in _TestPool.test_terminate(). - - # If some other test using ManagerMixin.manager fails, then the - # raised exception may keep alive a frame which holds a reference - # to a managed object. This will cause test_number_of_objects to - # also fail. - ALLOWED_TYPES = ('manager',) - - def test_number_of_objects(self): - EXPECTED_NUMBER = 1 # the pool object is still alive - multiprocessing.active_children() # discard dead process objs - gc.collect() # do garbage collection - refs = self.manager._number_of_objects() - debug_info = self.manager._debug_info() - if refs != EXPECTED_NUMBER: - print(self.manager._debug_info()) - print(debug_info) - - self.assertEqual(refs, EXPECTED_NUMBER) - # # Test of creating a customized manager class # @@ -2051,7 +2025,7 @@ address=addr, authkey=authkey, serializer=SERIALIZER) try: manager.start() - except IOError as e: + except OSError as e: if e.errno != errno.EADDRINUSE: raise # Retry after some time, in case the old socket was lingering @@ -2165,9 +2139,9 @@ self.assertEqual(reader.writable, False) self.assertEqual(writer.readable, False) self.assertEqual(writer.writable, True) - self.assertRaises(IOError, reader.send, 2) - self.assertRaises(IOError, writer.recv) - self.assertRaises(IOError, writer.poll) + self.assertRaises(OSError, reader.send, 2) + self.assertRaises(OSError, writer.recv) + self.assertRaises(OSError, writer.poll) def test_spawn_close(self): # We test that a pipe connection can be closed by parent @@ -2329,8 +2303,8 @@ if self.TYPE == 'processes': self.assertTrue(a.closed) self.assertTrue(b.closed) - self.assertRaises(IOError, a.recv) - self.assertRaises(IOError, b.recv) + self.assertRaises(OSError, a.recv) + self.assertRaises(OSError, b.recv) class _TestListener(BaseTestCase): @@ -2351,7 +2325,7 @@ self.assertEqual(d.recv(), 1729) if self.TYPE == 'processes': - self.assertRaises(IOError, l.accept) + self.assertRaises(OSError, l.accept) class _TestListenerClient(BaseTestCase): @@ -2401,7 +2375,7 @@ c.close() l.close() -class _TestPoll(unittest.TestCase): +class _TestPoll(BaseTestCase): ALLOWED_TYPES = ('processes', 'threads') @@ -2942,27 +2916,18 @@ def test_invalid_handles(self): conn = multiprocessing.connection.Connection(44977608) try: - self.assertRaises((ValueError, IOError), conn.poll) + self.assertRaises((ValueError, OSError), conn.poll) finally: # Hack private attribute _handle to avoid printing an error # in conn.__del__ conn._handle = None - self.assertRaises((ValueError, IOError), + self.assertRaises((ValueError, OSError), multiprocessing.connection.Connection, -1) # # Functions used to create test cases from the base ones in this module # -def get_attributes(Source, names): - d = {} - for name in names: - obj = getattr(Source, name) - if type(obj) == type(get_attributes): - obj = staticmethod(obj) - d[name] = obj - return d - def create_test_cases(Mixin, type): result = {} glob = globals() @@ -2975,10 +2940,10 @@ assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) if type in base.ALLOWED_TYPES: newname = 'With' + Type + name[1:] - class Temp(base, unittest.TestCase, Mixin): + class Temp(base, Mixin, unittest.TestCase): pass result[newname] = Temp - Temp.__name__ = newname + Temp.__name__ = Temp.__qualname__ = newname Temp.__module__ = Mixin.__module__ return result @@ -2989,12 +2954,24 @@ class ProcessesMixin(object): TYPE = 'processes' Process = multiprocessing.Process - locals().update(get_attributes(multiprocessing, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', - 'RawArray', 'current_process', 'active_children', 'Pipe', - 'connection', 'JoinableQueue', 'Pool' - ))) + connection = multiprocessing.connection + current_process = staticmethod(multiprocessing.current_process) + active_children = staticmethod(multiprocessing.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.Pipe) + Queue = staticmethod(multiprocessing.Queue) + JoinableQueue = staticmethod(multiprocessing.JoinableQueue) + Lock = staticmethod(multiprocessing.Lock) + RLock = staticmethod(multiprocessing.RLock) + Semaphore = staticmethod(multiprocessing.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) + Condition = staticmethod(multiprocessing.Condition) + Event = staticmethod(multiprocessing.Event) + Barrier = staticmethod(multiprocessing.Barrier) + Value = staticmethod(multiprocessing.Value) + Array = staticmethod(multiprocessing.Array) + RawValue = staticmethod(multiprocessing.RawValue) + RawArray = staticmethod(multiprocessing.RawArray) testcases_processes = create_test_cases(ProcessesMixin, type='processes') globals().update(testcases_processes) @@ -3003,12 +2980,48 @@ class ManagerMixin(object): TYPE = 'manager' Process = multiprocessing.Process - manager = object.__new__(multiprocessing.managers.SyncManager) - locals().update(get_attributes(manager, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict', - 'Namespace', 'JoinableQueue', 'Pool' - ))) + Queue = property(operator.attrgetter('manager.Queue')) + JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) + Lock = property(operator.attrgetter('manager.Lock')) + RLock = property(operator.attrgetter('manager.RLock')) + Semaphore = property(operator.attrgetter('manager.Semaphore')) + BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) + Condition = property(operator.attrgetter('manager.Condition')) + Event = property(operator.attrgetter('manager.Event')) + Barrier = property(operator.attrgetter('manager.Barrier')) + Value = property(operator.attrgetter('manager.Value')) + Array = property(operator.attrgetter('manager.Array')) + list = property(operator.attrgetter('manager.list')) + dict = property(operator.attrgetter('manager.dict')) + Namespace = property(operator.attrgetter('manager.Namespace')) + + @classmethod + def Pool(cls, *args, **kwds): + return cls.manager.Pool(*args, **kwds) + + @classmethod + def setUpClass(cls): + cls.manager = multiprocessing.Manager() + + @classmethod + def tearDownClass(cls): + # only the manager process should be returned by active_children() + # but this can take a bit on slow machines, so wait a few seconds + # if there are other children too (see #17395) + t = 0.01 + while len(multiprocessing.active_children()) > 1 and t < 5: + time.sleep(t) + t *= 2 + gc.collect() # do garbage collection + if cls.manager._number_of_objects() != 0: + # This is not really an error since some tests do not + # ensure that all processes which hold a reference to a + # managed object have been joined. + print('Shared objects which still exist at manager shutdown:') + print(cls.manager._debug_info()) + cls.manager.shutdown() + cls.manager.join() + cls.manager = None testcases_manager = create_test_cases(ManagerMixin, type='manager') globals().update(testcases_manager) @@ -3017,16 +3030,27 @@ class ThreadsMixin(object): TYPE = 'threads' Process = multiprocessing.dummy.Process - locals().update(get_attributes(multiprocessing.dummy, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', - 'active_children', 'Pipe', 'connection', 'dict', 'list', - 'Namespace', 'JoinableQueue', 'Pool' - ))) + connection = multiprocessing.dummy.connection + current_process = staticmethod(multiprocessing.dummy.current_process) + active_children = staticmethod(multiprocessing.dummy.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.dummy.Pipe) + Queue = staticmethod(multiprocessing.dummy.Queue) + JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) + Lock = staticmethod(multiprocessing.dummy.Lock) + RLock = staticmethod(multiprocessing.dummy.RLock) + Semaphore = staticmethod(multiprocessing.dummy.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) + Condition = staticmethod(multiprocessing.dummy.Condition) + Event = staticmethod(multiprocessing.dummy.Event) + Barrier = staticmethod(multiprocessing.dummy.Barrier) + Value = staticmethod(multiprocessing.dummy.Value) + Array = staticmethod(multiprocessing.dummy.Array) testcases_threads = create_test_cases(ThreadsMixin, type='threads') globals().update(testcases_threads) + class OtherTest(unittest.TestCase): # TODO: add more tests for deliver/answer challenge. def test_deliver_challenge_auth_failure(self): @@ -3532,16 +3556,7 @@ # # -testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor, TestWait, TestInvalidFamily, - TestFlags, TestTimeouts, TestNoForkBomb, - TestForkAwareThreadLock, TestIgnoreEINTR] - -# -# -# - -def test_main(run=None): +def setUpModule(): if sys.platform.startswith("linux"): try: lock = multiprocessing.RLock() @@ -3550,43 +3565,9 @@ check_enough_semaphores() - if run is None: - from test.support import run_unittest as run - util.get_temp_dir() # creates temp directory for use by all processes multiprocessing.get_logger().setLevel(LOG_LEVEL) - ProcessesMixin.pool = multiprocessing.Pool(4) - ThreadsMixin.pool = multiprocessing.dummy.Pool(4) - ManagerMixin.manager.__init__() - ManagerMixin.manager.start() - ManagerMixin.pool = ManagerMixin.manager.Pool(4) - - testcases = ( - sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + - sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + - sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + - testcases_other - ) - - loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase - suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) - try: - run(suite) - finally: - ThreadsMixin.pool.terminate() - ProcessesMixin.pool.terminate() - ManagerMixin.pool.terminate() - ManagerMixin.pool.join() - ManagerMixin.manager.shutdown() - ManagerMixin.manager.join() - ThreadsMixin.pool.join() - ProcessesMixin.pool.join() - del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool - -def main(): - test_main(unittest.TextTestRunner(verbosity=2).run) - if __name__ == '__main__': - main() + unittest.main()