diff -r 04ff1cc40d62 Doc/library/multiprocessing.rst --- a/Doc/library/multiprocessing.rst Fri Oct 04 11:38:59 2013 -0400 +++ b/Doc/library/multiprocessing.rst Sun Oct 06 23:13:08 2013 +0300 @@ -1794,7 +1794,9 @@ *processes* is the number of worker processes to use. If *processes* is ``None`` then the number returned by :func:`os.cpu_count` is used. If *initializer* is not ``None`` then each worker process will call - ``initializer(*initargs)`` when it starts. + ``initializer(*initargs)`` when it starts. Values returned by the initializer + will be passed to the function executed within the worker as first positional + argument. Note that the methods of the pool object should only be called by the process which created the pool. diff -r 04ff1cc40d62 Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Fri Oct 04 11:38:59 2013 -0400 +++ b/Lib/multiprocessing/pool.py Sun Oct 06 23:13:08 2013 +0300 @@ -92,6 +92,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) + initresults = None put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -99,7 +100,7 @@ outqueue._reader.close() if initializer is not None: - initializer(*initargs) + initresults = initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): @@ -114,6 +115,9 @@ break job, i, func, args, kwds = task + if initresults is not None: + args = list(args) + args.insert(0, initresults) try: result = (True, func(*args, **kwds)) except Exception as e: diff -r 04ff1cc40d62 Lib/test/_test_multiprocessing.py --- a/Lib/test/_test_multiprocessing.py Fri Oct 04 11:38:59 2013 -0400 +++ b/Lib/test/_test_multiprocessing.py Sun Oct 06 23:13:08 2013 +0300 @@ -1879,6 +1879,7 @@ for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) + # # Test of creating a customized manager class # @@ -3005,9 +3006,20 @@ # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 # + def initializer(ns): ns.test += 1 + +def workers_initializer(value): + return value + + +def workers_function(initializer_retval, argument): + initializer_retval.append(argument) + return initializer_retval + + class TestInitializers(unittest.TestCase): def setUp(self): self.mgr = multiprocessing.Manager() @@ -3033,6 +3045,20 @@ p.join() self.assertEqual(self.ns.test, 1) + def test_pool_initializer_returned_values(self): + results = [] + expected = [] + p = multiprocessing.Pool(1, workers_initializer, ([], )) + for i in range(6): + results.append(p.apply_async(workers_function, (i, ))) + p.close() + p.join() + # check the results + for (j, res) in enumerate(results): + expected.append(j) + self.assertEqual(res.get(), expected) + + # # Issue 5155, 5313, 5331: Test process in processes # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior