# HG changeset patch # Parent 08d4c2fe51ead6637f85868cd19400abb86da7ca diff -r 08d4c2fe51ea Lib/multiprocessing/reduction.py --- a/Lib/multiprocessing/reduction.py Tue Apr 24 22:56:57 2012 +0200 +++ b/Lib/multiprocessing/reduction.py Wed Apr 25 14:02:19 2012 +0100 @@ -40,6 +40,7 @@ import socket import threading import struct +import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug @@ -209,6 +210,7 @@ self._lock = threading.Lock() self._listener = None self._address = None + self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): @@ -227,6 +229,25 @@ c.send((key, os.getpid())) return c + def stop(self): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(5) + if self._thread.is_alive(): + print('Warning: ResourceSharer thread did not stop ' + + 'when asked', file=sys.stderr) + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + def _afterfork(self): for key, (send, close) in self._cache.items(): close() @@ -239,6 +260,7 @@ self._listener.close() self._listener = None self._address = None + self._thread = None def _start(self): from .connection import Listener @@ -249,12 +271,18 @@ t = threading.Thread(target=self._serve) t.daemon = True t.start() + self._thread = t def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() - key, destination_pid = conn.recv() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() diff -r 08d4c2fe51ea Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Tue Apr 24 22:56:57 2012 +0200 +++ b/Lib/test/test_multiprocessing.py Wed Apr 25 14:02:19 2012 +0100 @@ -1966,6 +1966,11 @@ ALLOWED_TYPES = ('processes',) @classmethod + def tearDownClass(cls): + from multiprocessing.reduction import resource_sharer + resource_sharer.stop() + + @classmethod def _listener(cls, conn, families): for fam in families: l = cls.connection.Listener(family=fam)