The following patch adds new optional behaviour to multiprocessing.Pool, in which you can specify the maximum number of tasks for one of the worker processes in the pool to complete before they exit and are automatically replaced with fresh worker processes. This allows unused resources the worker process is holding onto to be freed and returned to the system, including open files and memory which has leaked due to: * leaks in code in the tasks passed to the Pool * leaks or cycles in third-party modules used by tasks * excess high-water memory from an earlier large task This functionality is modelled upon similar functionality in Apache, mod_wsgi, and other packages. This work is contributed to the Python community by FATdrop Ltd. http://www.fatdrop.co.uk/ Charles Cazabon diff -urN Python-2.6.2.orig/Lib/multiprocessing/__init__.py Python-2.6.2/Lib/multiprocessing/__init__.py --- Python-2.6.2.orig/Lib/multiprocessing/__init__.py 2009-03-31 09:01:45.000000000 -0600 +++ Python-2.6.2/Lib/multiprocessing/__init__.py 2009-09-21 20:19:24.000000000 -0600 @@ -219,12 +219,12 @@ from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) -def Pool(processes=None, initializer=None, initargs=()): +def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool - return Pool(processes, initializer, initargs) + return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' diff -urN Python-2.6.2.orig/Lib/multiprocessing/pool.py Python-2.6.2/Lib/multiprocessing/pool.py --- Python-2.6.2.orig/Lib/multiprocessing/pool.py 2008-12-05 02:51:30.000000000 -0600 +++ Python-2.6.2/Lib/multiprocessing/pool.py 2009-09-21 20:19:24.000000000 -0600 @@ -42,7 +42,8 @@ # Code run by worker processes # -def worker(inqueue, outqueue, initializer=None, initargs=()): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxjobs=None): + assert maxjobs is None or (type(maxjobs) == int and maxjobs > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -52,7 +53,8 @@ if initializer is not None: initializer(*initargs) - while 1: + completed = 0 + while maxjobs is None or (maxjobs and completed < maxjobs): try: task = get() except (EOFError, IOError): @@ -69,6 +71,7 @@ except Exception, e: result = (False, e) put((job, i, result)) + completed += 1 # # Class representing a process pool @@ -80,11 +83,15 @@ ''' Process = Process - def __init__(self, processes=None, initializer=None, initargs=()): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} self._state = RUN + self._maxtasksperchild = maxtasksperchild + self._initializer = initializer + self._initargs = initargs if processes is None: try: @@ -92,16 +99,9 @@ except NotImplementedError: processes = 1 + self._processes = processes self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.name = w.name.replace('Process', 'PoolWorker') - w.daemon = True - w.start() + self._repopulate_pool() self._task_handler = threading.Thread( target=Pool._handle_tasks, @@ -126,6 +126,42 @@ exitpriority=15 ) + def _join_exited_workers(self): + """Cleanup after any worker processes which have exited due to reaching + their specified lifetime. Returns True if any workers were cleaned up. + """ + cleaned = False + for i in reversed(range(len(self._pool))): + worker = self._pool[i] + if not worker.is_alive(): + # worker exited + debug('cleaning up worker %d' % i) + worker.join() + cleaned = True + del self._pool[i] + return cleaned + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process( + target=worker, + args=(self._inqueue, self._outqueue, self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + + def _maintain_pool(self): + """Clean up any exited workers and start replacements for them. + """ + if self._join_exited_workers(): + self._repopulate_pool() + def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() @@ -138,6 +174,7 @@ Equivalent of `apply()` builtin ''' assert self._state == RUN + self._maintain_pool() return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): @@ -145,6 +182,7 @@ Equivalent of `map()` builtin ''' assert self._state == RUN + self._maintain_pool() return self.map_async(func, iterable, chunksize).get() def imap(self, func, iterable, chunksize=1): @@ -152,6 +190,7 @@ Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` ''' assert self._state == RUN + self._maintain_pool() if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -170,6 +209,7 @@ Like `imap()` method but ordering of results is arbitrary ''' assert self._state == RUN + self._maintain_pool() if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -188,6 +228,7 @@ Asynchronous equivalent of `apply()` builtin ''' assert self._state == RUN + self._maintain_pool() result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -197,6 +238,7 @@ Asynchronous equivalent of `map()` builtin ''' assert self._state == RUN + self._maintain_pool() if not hasattr(iterable, '__len__'): iterable = list(iterable)