# HG changeset patch # Parent e6c9c4c613c5362b818c6cc25cefc98d7b96f24f diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/mp_common.py rename from Lib/test/test_multiprocessing.py rename to Lib/test/mp_common.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/mp_common.py @@ -1,7 +1,14 @@ -#!/usr/bin/env python3 - # -# Unit tests for the multiprocessing package +# Unit tests for the multiprocessing package which are +# run using different configurations +# +# The base classes defined here are used by +# +# test_multiprocessing_fork +# test_multiprocessing_nofork +# test_multiprocessing_manager_fork +# test_multiprocessing_manager_nofork +# test_multiprocessing_threads # import unittest @@ -2150,23 +2157,6 @@ # logger.warn('foo') # assert self.__handled -# -# Test to verify handle verification, see issue 3321 -# - -class TestInvalidHandle(unittest.TestCase): - - @unittest.skipIf(WIN32, "skipped on Windows") - def test_invalid_handles(self): - conn = multiprocessing.connection.Connection(44977608) - try: - self.assertRaises((ValueError, IOError), conn.poll) - finally: - # Hack private attribute _handle to avoid printing an error - # in conn.__del__ - conn._handle = None - self.assertRaises((ValueError, IOError), - multiprocessing.connection.Connection, -1) # # Functions used to create test cases from the base ones in this module @@ -2181,16 +2171,15 @@ d[name] = obj return d -def create_test_cases(Mixin, type): +def create_test_cases(Mixin): result = {} glob = globals() - Type = type.capitalize() for name in list(glob.keys()): if name.startswith('_Test'): base = glob[name] - if type in base.ALLOWED_TYPES: - newname = 'With' + Type + name[1:] + if Mixin.TYPE in base.ALLOWED_TYPES: + newname = name[1:] class Temp(base, unittest.TestCase, Mixin): pass result[newname] = Temp @@ -2199,184 +2188,10 @@ return result # -# Create test cases # - -class ProcessesMixin(object): - TYPE = 'processes' - Process = multiprocessing.Process - locals().update(get_attributes(multiprocessing, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'RawValue', - 'RawArray', 'current_process', 'active_children', 'Pipe', - 'connection', 'JoinableQueue' - ))) - -testcases_processes = create_test_cases(ProcessesMixin, type='processes') -globals().update(testcases_processes) - - -class ManagerMixin(object): - TYPE = 'manager' - Process = multiprocessing.Process - manager = object.__new__(multiprocessing.managers.SyncManager) - locals().update(get_attributes(manager, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', - 'Namespace', 'JoinableQueue' - ))) - -testcases_manager = create_test_cases(ManagerMixin, type='manager') -globals().update(testcases_manager) - - -class ThreadsMixin(object): - TYPE = 'threads' - Process = multiprocessing.dummy.Process - locals().update(get_attributes(multiprocessing.dummy, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Value', 'Array', 'current_process', - 'active_children', 'Pipe', 'connection', 'dict', 'list', - 'Namespace', 'JoinableQueue' - ))) - -testcases_threads = create_test_cases(ThreadsMixin, type='threads') -globals().update(testcases_threads) - -class OtherTest(unittest.TestCase): - # TODO: add more tests for deliver/answer challenge. - def test_deliver_challenge_auth_failure(self): - class _FakeConnection(object): - def recv_bytes(self, size): - return b'something bogus' - def send_bytes(self, data): - pass - self.assertRaises(multiprocessing.AuthenticationError, - multiprocessing.connection.deliver_challenge, - _FakeConnection(), b'abc') - - def test_answer_challenge_auth_failure(self): - class _FakeConnection(object): - def __init__(self): - self.count = 0 - def recv_bytes(self, size): - self.count += 1 - if self.count == 1: - return multiprocessing.connection.CHALLENGE - elif self.count == 2: - return b'something bogus' - return b'' - def send_bytes(self, data): - pass - self.assertRaises(multiprocessing.AuthenticationError, - multiprocessing.connection.answer_challenge, - _FakeConnection(), b'abc') - # -# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 -# - -def initializer(ns): - ns.test += 1 - -class TestInitializers(unittest.TestCase): - def setUp(self): - self.mgr = multiprocessing.Manager() - self.ns = self.mgr.Namespace() - self.ns.test = 0 - - def tearDown(self): - self.mgr.shutdown() - - def test_manager_initializer(self): - m = multiprocessing.managers.SyncManager() - self.assertRaises(TypeError, m.start, 1) - m.start(initializer, (self.ns,)) - self.assertEqual(self.ns.test, 1) - m.shutdown() - - def test_pool_initializer(self): - self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) - p = multiprocessing.Pool(1, initializer, (self.ns,)) - p.close() - p.join() - self.assertEqual(self.ns.test, 1) - -# -# Issue 5155, 5313, 5331: Test process in processes -# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior -# - -def _ThisSubProcess(q): - try: - item = q.get(block=False) - except pyqueue.Empty: - pass - -def _TestProcess(q): - queue = multiprocessing.Queue() - subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) - subProc.daemon = True - subProc.start() - subProc.join() - -def _afunc(x): - return x*x - -def pool_in_process(): - pool = multiprocessing.Pool(processes=4) - x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) - -class _file_like(object): - def __init__(self, delegate): - self._delegate = delegate - self._pid = None - - @property - def cache(self): - pid = os.getpid() - # There are no race conditions since fork keeps only the running thread - if pid != self._pid: - self._pid = pid - self._cache = [] - return self._cache - - def write(self, data): - self.cache.append(data) - - def flush(self): - self._delegate.write(''.join(self.cache)) - self._cache = [] - -class TestStdinBadfiledescriptor(unittest.TestCase): - - def test_queue_in_process(self): - queue = multiprocessing.Queue() - proc = multiprocessing.Process(target=_TestProcess, args=(queue,)) - proc.start() - proc.join() - - def test_pool_in_process(self): - p = multiprocessing.Process(target=pool_in_process) - p.start() - p.join() - - def test_flushing(self): - sio = io.StringIO() - flike = _file_like(sio) - flike.write('foo') - proc = multiprocessing.Process(target=lambda: flike.flush()) - flike.flush() - assert sio.getvalue() == 'foo' - -testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor] - -# -# -# - -def test_main(run=None): + +def _main(prepare, cleanup, name): if sys.platform.startswith("linux"): try: lock = multiprocessing.RLock() @@ -2384,40 +2199,10 @@ raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") check_enough_semaphores() - - if run is None: - from test.support import run_unittest as run - util.get_temp_dir() # creates temp directory for use by all processes - multiprocessing.get_logger().setLevel(LOG_LEVEL) - - ProcessesMixin.pool = multiprocessing.Pool(4) - ThreadsMixin.pool = multiprocessing.dummy.Pool(4) - ManagerMixin.manager.__init__() - ManagerMixin.manager.start() - ManagerMixin.pool = ManagerMixin.manager.Pool(4) - - testcases = ( - sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + - sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + - sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + - testcases_other - ) - - loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase - suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) - run(suite) - - ThreadsMixin.pool.terminate() - ProcessesMixin.pool.terminate() - ManagerMixin.pool.terminate() - ManagerMixin.manager.shutdown() - - del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool - -def main(): - test_main(unittest.TextTestRunner(verbosity=2).run) - -if __name__ == '__main__': - main() + old_state = prepare() + try: + test.support.run_unittest(name) + finally: + cleanup(old_state) diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_fork.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# Test of multiprocessing using forked processes + +import sys +import multiprocessing +from test import mp_common + +class Mixin(object): + TYPE = 'processes' + Process = multiprocessing.Process + locals().update(mp_common.get_attributes(multiprocessing, ( + 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', + 'Condition', 'Event', 'Value', 'Array', 'RawValue', + 'RawArray', 'current_process', 'active_children', 'Pipe', + 'connection', 'JoinableQueue' + ))) + +testcases_dict = mp_common.create_test_cases(Mixin) +globals().update(testcases_dict) + +def prepare(): + old_is_forking = multiprocessing.forking_is_enabled() + multiprocessing.forking_enable(True) + Mixin.pool = multiprocessing.Pool(4) + return old_is_forking + +def cleanup(old_state): + Mixin.pool.terminate() + del Mixin.pool + multiprocessing.forking_enable(old_state) + +def test_main(): + if sys.platform == "win32": + raise unittest.SkipTest("Windows does not have fork") + mp_common._main(prepare, cleanup, __name__) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_multiprocessing_manager_fork.py b/Lib/test/test_multiprocessing_manager_fork.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_manager_fork.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 + +# Test of multiprocessing using forked processes and a manager process + +import sys +import multiprocessing +from test import mp_common + +class Mixin(object): + TYPE = 'manager' + Process = multiprocessing.Process + manager = object.__new__(multiprocessing.managers.SyncManager) + locals().update(mp_common.get_attributes(manager, ( + 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', + 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', + 'Namespace', 'JoinableQueue' + ))) + +testcases_dict = mp_common.create_test_cases(Mixin) +globals().update(testcases_dict) + +def prepare(): + old_is_forking = multiprocessing.forking_is_enabled() + multiprocessing.forking_enable(True) + Mixin.manager.__init__() + Mixin.manager.start() + Mixin.pool = Mixin.manager.Pool(4) + return old_is_forking + +def cleanup(old_state): + Mixin.pool.terminate() + Mixin.manager.shutdown() + del Mixin.pool + multiprocessing.forking_enable(old_state) + +def test_main(): + if sys.platform == "win32": + raise unittest.SkipTest("Windows does not have fork") + mp_common._main(prepare, cleanup, __name__) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_multiprocessing_manager_nofork.py b/Lib/test/test_multiprocessing_manager_nofork.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_manager_nofork.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# Test of multiprocessing using non-forked processes and a manager process + +import multiprocessing +from test import mp_common + +class Mixin(object): + TYPE = 'manager' + Process = multiprocessing.Process + manager = object.__new__(multiprocessing.managers.SyncManager) + locals().update(mp_common.get_attributes(manager, ( + 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', + 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', + 'Namespace', 'JoinableQueue' + ))) + +testcases_dict = mp_common.create_test_cases(Mixin) +globals().update(testcases_dict) + +def prepare(): + old_is_forking = multiprocessing.forking_is_enabled() + multiprocessing.forking_enable(False) + Mixin.manager.__init__() + Mixin.manager.start() + Mixin.pool = Mixin.manager.Pool(4) + return old_is_forking + +def cleanup(old_state): + Mixin.pool.terminate() + Mixin.manager.shutdown() + del Mixin.pool + multiprocessing.forking_enable(old_state) + +def test_main(): + mp_common._main(prepare, cleanup, __name__) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_multiprocessing_misc.py b/Lib/test/test_multiprocessing_misc.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_misc.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Tests for multiprocessing which do not need to be run +# with multiple configurations + +import test.support +import unittest + +# Skip tests if _multiprocessing wasn't built. +_multiprocessing = test.support.import_module('_multiprocessing') +# Skip tests if sem_open implementation is broken. +test.support.import_module('multiprocessing.synchronize') + +import sys +import io +import os +import queue as pyqueue +import multiprocessing + +WIN32 = sys.platform == 'win32' + +# +# Test to verify handle verification, see issue 3321 +# + +class TestInvalidHandle(unittest.TestCase): + + @unittest.skipIf(WIN32, "skipped on Windows") + def test_invalid_handles(self): + conn = multiprocessing.connection.Connection(44977608) + try: + self.assertRaises((ValueError, IOError), conn.poll) + finally: + # Hack private attribute _handle to avoid printing an error + # in conn.__del__ + conn._handle = None + self.assertRaises((ValueError, IOError), + multiprocessing.connection.Connection, -1) + +class OtherTest(unittest.TestCase): + # TODO: add more tests for deliver/answer challenge. + def test_deliver_challenge_auth_failure(self): + class _FakeConnection(object): + def recv_bytes(self, size): + return b'something bogus' + def send_bytes(self, data): + pass + self.assertRaises(multiprocessing.AuthenticationError, + multiprocessing.connection.deliver_challenge, + _FakeConnection(), b'abc') + + def test_answer_challenge_auth_failure(self): + class _FakeConnection(object): + def __init__(self): + self.count = 0 + def recv_bytes(self, size): + self.count += 1 + if self.count == 1: + return multiprocessing.connection.CHALLENGE + elif self.count == 2: + return b'something bogus' + return b'' + def send_bytes(self, data): + pass + self.assertRaises(multiprocessing.AuthenticationError, + multiprocessing.connection.answer_challenge, + _FakeConnection(), b'abc') + +# +# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 +# + +def initializer(ns): + ns.test += 1 + +class TestInitializers(unittest.TestCase): + def setUp(self): + self.mgr = multiprocessing.Manager() + self.ns = self.mgr.Namespace() + self.ns.test = 0 + + def tearDown(self): + self.mgr.shutdown() + + def test_manager_initializer(self): + m = multiprocessing.managers.SyncManager() + self.assertRaises(TypeError, m.start, 1) + m.start(initializer, (self.ns,)) + self.assertEqual(self.ns.test, 1) + m.shutdown() + + def test_pool_initializer(self): + self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) + p = multiprocessing.Pool(1, initializer, (self.ns,)) + p.close() + p.join() + self.assertEqual(self.ns.test, 1) + +# +# Issue 5155, 5313, 5331: Test process in processes +# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior +# + +def _ThisSubProcess(q): + try: + item = q.get(block=False) + except pyqueue.Empty: + pass + +def _TestProcess(q): + queue = multiprocessing.Queue() + subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) + subProc.daemon = True + subProc.start() + subProc.join() + +def _afunc(x): + return x*x + +def pool_in_process(): + pool = multiprocessing.Pool(processes=4) + x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) + +class _file_like(object): + def __init__(self, delegate): + self._delegate = delegate + self._pid = None + + @property + def cache(self): + pid = os.getpid() + # There are no race conditions since fork keeps only the running thread + if pid != self._pid: + self._pid = pid + self._cache = [] + return self._cache + + def write(self, data): + self.cache.append(data) + + def flush(self): + self._delegate.write(''.join(self.cache)) + self._cache = [] + +class TestStdinBadfiledescriptor(unittest.TestCase): + + def test_queue_in_process(self): + queue = multiprocessing.Queue() + proc = multiprocessing.Process(target=_TestProcess, args=(queue,)) + proc.start() + proc.join() + + def test_pool_in_process(self): + p = multiprocessing.Process(target=pool_in_process) + p.start() + p.join() + + def test_flushing(self): + sio = io.StringIO() + flike = _file_like(sio) + flike.write('foo') + proc = multiprocessing.Process(target=lambda: flike.flush()) + flike.flush() + assert sio.getvalue() == 'foo' + +def test_main(): + test.support.run_unittest(__name__) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_multiprocessing_nofork.py b/Lib/test/test_multiprocessing_nofork.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_nofork.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +# Test of multiprocessing using non-forked processes + +import multiprocessing +from test import mp_common + +class Mixin(object): + TYPE = 'processes' + Process = multiprocessing.Process + locals().update(mp_common.get_attributes(multiprocessing, ( + 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', + 'Condition', 'Event', 'Value', 'Array', 'RawValue', + 'RawArray', 'current_process', 'active_children', 'Pipe', + 'connection', 'JoinableQueue' + ))) + +testcases_dict = mp_common.create_test_cases(Mixin) +globals().update(testcases_dict) + +def prepare(): + old_is_forking = multiprocessing.forking_is_enabled() + multiprocessing.forking_enable(False) + Mixin.pool = multiprocessing.Pool(4) + return old_is_forking + +def cleanup(old_state): + Mixin.pool.terminate() + del Mixin.pool + multiprocessing.forking_enable(old_state) + +def test_main(): + mp_common._main(prepare, cleanup, __name__) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_multiprocessing_threads.py b/Lib/test/test_multiprocessing_threads.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_threads.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +# Test of multiprocessing using threads instead of processes + +import multiprocessing.dummy +from test import mp_common + +class Mixin(object): + TYPE = 'threads' + Process = multiprocessing.dummy.Process + locals().update(mp_common.get_attributes(multiprocessing.dummy, ( + 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', + 'Condition', 'Event', 'Value', 'Array', 'current_process', + 'active_children', 'Pipe', 'connection', 'dict', 'list', + 'Namespace', 'JoinableQueue' + ))) + +testcases_dict = mp_common.create_test_cases(Mixin) +globals().update(testcases_dict) + +def prepare(): + Mixin.pool = multiprocessing.dummy.Pool(4) + return None + +def cleanup(old_state): + Mixin.pool.terminate() + del Mixin.pool + +def test_main(): + mp_common._main(prepare, cleanup, __name__) + +if __name__ == '__main__': + test_main()