Author dsoprea
Recipients dsoprea
Date 2013-11-21.01:52:18
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1384998739.89.0.441869255184.issue19675@psf.upfronthosting.co.za>
In-reply-to
Content
If you provide a number of processes to a Pool that the OS can't fulfill, Pool will raise an OSError and die, but does not cleanup any of the processes that it has forked.

This is a session in Python where I can allocate a large, but fulfillable, number of processes (just to exhibit what's possible in my current system):

>>> from multiprocessing import Pool
>>> p = Pool(500)
>>> p.close()
>>> p.join()

Now, this is a request that will fail. However, even after this fails, I can't allocate even a single worker:

>>> p = Pool(700)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
    self._repopulate_pool()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 222, in _repopulate_pool
    w.start()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 130, in start
    self._popen = Popen(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 121, in __init__
    self.pid = os.fork()
OSError: [Errno 35] Resource temporarily unavailable

>>> p = Pool(1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
    self._repopulate_pool()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 222, in _repopulate_pool
    w.start()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 130, in start
    self._popen = Popen(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 121, in __init__
    self.pid = os.fork()
OSError: [Errno 35] Resource temporarily unavailable

The only way to clean this up is to close the parent (the interpreter).

I'm submitting a patch for 2.7.6 that intercepts exceptions and cleans-up the workers before bubbling. The affected method is _repopulate_pool(), and appears to be the same in 2.7.6, 3.3.3, and probably every other recent version of Python.

This is the old version:

        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

This is the new version:

        try:
            for i in range(self._processes - len(self._pool)):
                w = self.Process(target=worker,
                                 args=(self._inqueue, self._outqueue,
                                       self._initializer,
                                       self._initargs, self._maxtasksperchild)
                                )
                self._pool.append(w)
                w.name = w.name.replace('Process', 'PoolWorker')
                w.daemon = True
                w.start()
                debug('added worker')
        except:
            debug("Process creation error. Cleaning-up (%d) workers." % (len(self._pool)))

            for process in self._pool:
                if process.is_alive() is False:
                    continue

                process.terminate()
                process.join()

            debug("Processing cleaning-up. Bubbling error.")
            raise

This is what happens, now: I can go from requesting a number that's too high to immediately requesting one that's also high but within limits, and there's now no problem as all resources have been freed:

>>> from multiprocessing import Pool
>>> p = Pool(700)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
    self._repopulate_pool()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 224, in _repopulate_pool
    w.start()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 130, in start
    self._popen = Popen(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 121, in __init__
    self.pid = os.fork()
OSError: [Errno 35] Resource temporarily unavailable
>>> p = Pool(500)
>>> p.close()
>>> p.join()
History
Date User Action Args
2013-11-21 01:52:19dsopreasetrecipients: + dsoprea
2013-11-21 01:52:19dsopreasetmessageid: <1384998739.89.0.441869255184.issue19675@psf.upfronthosting.co.za>
2013-11-21 01:52:19dsoprealinkissue19675 messages
2013-11-21 01:52:19dsopreacreate