The following patch adds new optional behaviour to multiprocessing.Pool, in which you can specify the maximum number of tasks for one of the worker processes in the pool to complete before they exit and are replaced automatically with fresh worker threads. This allows resources the worker is holding onto to be freed and returned to the system, for example from: * leaks in code in the tasks passed to the Pool * leaks or cycles in third-party modules used by tasks * excess high-water memory from a large task passed to the pool earlier This functionality is modelled upon similar functionality in Apache, mod_wsgi, and other packages. This work was performed by Charles Cazabon , on behalf of FATdrop Ltd.: http://www.fatdrop.co.uk/ diff -urN Python-2.6.2.orig/Doc/library/multiprocessing.rst Python-2.6.2/Doc/library/multiprocessing.rst --- Python-2.6.2.orig/Doc/library/multiprocessing.rst 2009-04-05 15:26:31.000000000 -0600 +++ Python-2.6.2/Doc/library/multiprocessing.rst 2010-01-18 14:26:27.000000000 -0600 @@ -1510,7 +1510,7 @@ One can create a pool of processes which will carry out tasks submitted to it with the :class:`Pool` class. -.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]]) +.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]]) A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and @@ -1519,7 +1519,11 @@ *processes* is the number of worker processes to use. If *processes* is ``None`` then the number returned by :func:`cpu_count` is used. If *initializer* is not ``None`` then each worker process will call - ``initializer(*initargs)`` when it starts. + ``initializer(*initargs)`` when it starts. *maxtasksperchild* is the number + of tasks a worker process can complete before it will exit and be replaced + with a fresh worker process, to enable unused resources to be freed. + The default *maxtasksperchild* is None, which means worker processes will + live as long as the pool. .. method:: apply(func[, args[, kwds]]) diff -urN Python-2.6.2.orig/Lib/multiprocessing/__init__.py Python-2.6.2/Lib/multiprocessing/__init__.py --- Python-2.6.2.orig/Lib/multiprocessing/__init__.py 2009-03-31 09:01:45.000000000 -0600 +++ Python-2.6.2/Lib/multiprocessing/__init__.py 2010-01-18 14:27:13.000000000 -0600 @@ -219,12 +219,12 @@ from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) -def Pool(processes=None, initializer=None, initargs=()): +def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool - return Pool(processes, initializer, initargs) + return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' diff -urN Python-2.6.2.orig/Lib/multiprocessing/pool.py Python-2.6.2/Lib/multiprocessing/pool.py --- Python-2.6.2.orig/Lib/multiprocessing/pool.py 2008-12-05 02:51:30.000000000 -0600 +++ Python-2.6.2/Lib/multiprocessing/pool.py 2010-01-18 14:27:13.000000000 -0600 @@ -29,6 +29,7 @@ CLOSE = 1 TERMINATE = 2 + # # Miscellaneous # @@ -42,7 +43,8 @@ # Code run by worker processes # -def worker(inqueue, outqueue, initializer=None, initargs=()): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): + assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -52,7 +54,8 @@ if initializer is not None: initializer(*initargs) - while 1: + completed = 0 + while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): @@ -69,6 +72,9 @@ except Exception, e: result = (False, e) put((job, i, result)) + completed += 1 + debug('worker exiting after %d tasks' % completed) + # # Class representing a process pool @@ -80,11 +86,15 @@ ''' Process = Process - def __init__(self, processes=None, initializer=None, initargs=()): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} self._state = RUN + self._maxtasksperchild = maxtasksperchild + self._initializer = initializer + self._initargs = initargs if processes is None: try: @@ -92,16 +102,17 @@ except NotImplementedError: processes = 1 + self._processes = processes self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.name = w.name.replace('Process', 'PoolWorker') - w.daemon = True - w.start() + self._repopulate_pool() + + self._worker_handler = threading.Thread( + target=Pool._handle_workers, + args=(self, ) + ) + self._worker_handler.daemon = True + self._worker_handler._state = RUN + self._worker_handler.start() self._task_handler = threading.Thread( target=Pool._handle_tasks, @@ -122,10 +133,48 @@ self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._task_handler, self._result_handler, self._cache), + self._worker_handler, self._task_handler, + self._result_handler, self._cache), exitpriority=15 ) + def _join_exited_workers(self): + """Cleanup after any worker processes which have exited due to reaching + their specified lifetime. Returns True if any workers were cleaned up. + """ + cleaned = False + for i in reversed(range(len(self._pool))): + worker = self._pool[i] + if worker.exitcode is not None: + # worker exited + debug('cleaning up worker %d' % i) + worker.join() + cleaned = True + del self._pool[i] + return cleaned + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process( + target=worker, + args=(self._inqueue, self._outqueue, self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + debug('added worker') + + def _maintain_pool(self): + """Clean up any exited workers and start replacements for them. + """ + if self._join_exited_workers(): + self._repopulate_pool() + def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() @@ -212,6 +261,13 @@ return result @staticmethod + def _handle_workers(pool): + while pool._worker_handler._state == RUN and pool._state == RUN: + pool._maintain_pool() + time.sleep(0.1) + debug('worker handler exiting') + + @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): thread = threading.current_thread() @@ -326,16 +382,19 @@ debug('closing pool') if self._state == RUN: self._state = CLOSE + self._worker_handler._state = CLOSE self._taskqueue.put(None) def terminate(self): debug('terminating pool') self._state = TERMINATE + self._worker_handler._state = TERMINATE self._terminate() def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) + self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: @@ -352,10 +411,11 @@ @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, - task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once debug('finalizing pool') + worker_handler._state = TERMINATE task_handler._state = TERMINATE taskqueue.put(None) # sentinel @@ -367,10 +427,15 @@ result_handler._state = TERMINATE outqueue.put(None) # sentinel + # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: - p.terminate() + if p.exitcode is None: + p.terminate() + + debug('joining worker handler') + worker_handler.join(1e100) debug('joining task handler') task_handler.join(1e100) @@ -380,8 +445,11 @@ if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers') - for p in pool: - p.join() + for w in pool: + if w.exitcode is None: + # worker has not yet exited + debug('cleaning up worker %d' % w.pid) + w.join() # # Class whose instances are returned by `Pool.apply_async()` diff -urN Python-2.6.2.orig/Lib/test/test_multiprocessing.py Python-2.6.2/Lib/test/test_multiprocessing.py --- Python-2.6.2.orig/Lib/test/test_multiprocessing.py 2009-03-30 17:38:36.000000000 -0600 +++ Python-2.6.2/Lib/test/test_multiprocessing.py 2010-01-18 14:26:27.000000000 -0600 @@ -1045,6 +1045,33 @@ join = TimingWrapper(self.pool.join) join() self.assertTrue(join.elapsed < 0.2) + + +class _TestPoolWorkerLifetime(BaseTestCase): + + ALLOWED_TYPES = ('processes', ) + + def test_pool_worker_lifetime(self): + p = multiprocessing.Pool(3) + self.assertEqual(3, len(p._pool)) + origworkerpids = [w.pid for w in p._pool] + # Run many tasks so each worker gets replaced (hopefully) + results = [] + for i in range(100): + results.append(p.apply_async(sqr, (i, ))) + # Fetch the results and verify we got the right answers, + # also ensuring all the tasks have completed. + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + # Refill the pool + p._repopulate_pool() + # Finally, check that the worker pids have changed + finalworkerpids = [w.pid for w in p._pool] + self.assertNotEqual(sorted(workerpids), sorted(finalworkerpids)) + p.close() + p.join() + + # # Test that manager has expected number of shared objects left #