classification
Title: multiprocessing Pool keeps objects (tasks, args, results) alive too long
Type: resource usage Stage: needs patch
Components: Library (Lib) Versions: Python 3.7, Python 3.6, Python 3.5, Python 2.7
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: davin, pitrou, sbt
Priority: normal Keywords:

Created on 2017-03-20 18:14 by pitrou, last changed 2017-03-20 18:43 by pitrou.

Pull Requests
URL Status Linked Edit
PR 743 open pitrou, 2017-03-20 18:43
Messages (2)
msg289894 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-03-20 18:14
The various workers in multiprocessing.Pool keep a reference to the last encountered task or task result.  This means some data may be kept alive even after the caller is done with them, as long as some other task doesn't clobber the relevant variables.

Specifically, Pool._handle_tasks(), Pool._handle_results() and the toplevel worker() function fail to clear references at the end of each loop.

Originally reported at https://github.com/dask/distributed/issues/956
msg289895 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-03-20 18:17
Quick patch below.  I'll make a PR once I have time to :-)

diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index ffdf426..945afa2 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
             util.debug("Possible encoding error while sending result: %s" % (
                 wrapped))
             put((job, i, (False, wrapped)))
+
+        task = job = result = func = args = kwds = None
         completed += 1
     util.debug('worker exiting after %d tasks' % completed)
 
@@ -402,6 +404,8 @@ class Pool(object):
                 if set_length:
                     util.debug('doing set_length()')
                     set_length(i+1)
+            finally:
+                task = taskseq = job = None
         else:
             util.debug('task handler got sentinel')
 
@@ -445,6 +449,7 @@ class Pool(object):
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         while cache and thread._state != TERMINATE:
             try:
@@ -461,6 +466,7 @@ class Pool(object):
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         if hasattr(outqueue, '_reader'):
             util.debug('ensuring that outqueue is not full')
History
Date User Action Args
2017-03-20 18:43:52pitrousetpull_requests: + pull_request656
2017-03-20 18:17:28pitrousetmessages: + msg289895
2017-03-20 18:14:14pitroucreate