This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author neologix
Recipients flox, jnoller, neologix, vstinner
Date 2011-04-10.12:41:05
SpamBayes Score 1.8161062e-10
Marked as misclassified No
Message-id <1302439269.07.0.64097738993.issue8428@psf.upfronthosting.co.za>
In-reply-to
Content
I think those lockups are due to a race in the Pool shutdown code.
In Lib/multiprocessing/pool.py:

    def close(self):
        debug('closing pool')
        if self._state == RUN:
            self._state = CLOSE
            self._worker_handler._state = CLOSE
            self._taskqueue.put(None) 

We set the current state to CLOSE, and send None to the taskqueue, so that task_handler detects that we want to shut down the queue and sends None (sentinel) to the inqueue for each worker process.
When a worker process receives this sentinel, it exists, and when Pool's join method is called, each process is joined successfully.
Now, there's a problem, because of the worker_hanler thread.
This thread constantly starts new threads if existing one exited after having completed their work:

    def _handle_workers(pool):
        while pool._worker_handler._state == RUN and pool._state == RUN:
            pool._maintain_pool()
            time.sleep(0.1)
        debug('worker handler exiting')

where 

    def _maintain_pool(self):
        """Clean up any exited workers and start replacements for them.
        """
        if self._join_exited_workers():
            self._repopulate_pool()

Imagine the following happens:

worker_handler checks that the pool is still running (state == RUN), but before calling maintain_pool, it's preempted (releasal of the GIL), and Pool's close() methode is called :
state is set to CLOSE, None is put to taskqueue, and worker threads exit.
Then, Pool's join is called:

    def join(self):
        debug('joining pool')
        assert self._state in (CLOSE, TERMINATE)
        self._worker_handler.join()
        self._task_handler.join()
        self._result_handler.join()
        for p in self._pool:
            p.join()


this blocks until worker_handler exits. This thread sooner or later resumes and calls maintain_pool.
maintain_pool calls repopulate_pool, which recreates new worker threads/processes.
Then, worker_handler checks the current state, sees CLOSE, and exists.
Then, Pool's join blocks  there:
        for p in self._pool:
            p.join()

since the newly created processes never receive the sentinels (already consumed by the previous worker processes)...

This race can be reproduced almost every time by just adding:


    def _handle_workers(pool):
        while pool._worker_handler._state == RUN and pool._state == RUN:
+            time.sleep(1)
            pool._maintain_pool()
            time.sleep(0.1)
        debug('worker handler exiting')

Then something as simple as this will block:

p = multiprocessing.Pool(3)
p.close()
p.join()

I still have to think of a clean way to solve this.
History
Date User Action Args
2011-04-10 12:41:09neologixsetrecipients: + neologix, vstinner, jnoller, flox
2011-04-10 12:41:09neologixsetmessageid: <1302439269.07.0.64097738993.issue8428@psf.upfronthosting.co.za>
2011-04-10 12:41:06neologixlinkissue8428 messages
2011-04-10 12:41:05neologixcreate