Index: mp_distributing.py =================================================================== --- mp_distributing.py (revision 74264) +++ mp_distributing.py (working copy) @@ -48,19 +48,36 @@ info = _logger.info debug = _logger.debug -# -# Get number of cpus -# - +# ====================== +# = Get number of CPUs = +# ====================== try: slot_count = cpu_count() except NotImplemented: slot_count = 1 -# -# Manager type which spawns subprocesses -# +# ====================== +# = Distributed Server = +# ====================== +class DistributedServer(managers.Server): + # Overide decref to change call to util.debug + def decref(self, c, ident): + self.mutex.acquire() + try: + assert self.id_to_refcount[ident] >= 1 + self.id_to_refcount[ident] -= 1 + if self.id_to_refcount[ident] == 0: + del self.id_to_obj[ident], self.id_to_refcount[ident] + # we're not using integers, so we need %r instead of %d or + # we throw exceptions when logging. + util.debug('disposing of obj with id %r', ident) + finally: + self.mutex.release() + +# =========== +# = Manager = +# =========== class HostManager(managers.SyncManager): ''' Manager type used for spawning processes on a (presumably) foreign host @@ -79,13 +96,17 @@ if name is None: temp = self._name.split('Host-')[-1] + '/Process-%s' name = temp % ':'.join(map(str, p.get_identity())) - p.set_name(name) + p.name = name return p @classmethod def from_address(cls, address, authkey): manager = cls(address, authkey) - managers.transact(address, authkey, 'dummy') + conn = connection.Client(address, authkey=authkey) + try: + managers.dispatch(conn, None, 'dummy') + finally: + conn.close() manager._state.value = managers.State.STARTED manager._name = 'Host-%s:%s' % manager.address manager.shutdown = util.Finalize( @@ -97,15 +118,18 @@ @staticmethod def _finalize_host(address, authkey, name): - managers.transact(address, authkey, 'shutdown') + conn = connection.Client(address, authkey=authkey) + try: + return managers.dispatch(conn, None, 'shutdown') + finally: + conn.close() def __repr__(self): return '' % self._name -# -# Process subclass representing a process on (possibly) a remote machine -# - +# ================== +# = Remote Process = +# ================== class RemoteProcess(Process): ''' Represents a process started on a remote host @@ -126,10 +150,9 @@ HostManager.register('_RemoteProcess', RemoteProcess) -# -# A Pool class that uses a cluster -# - +# ======== +# = Pool = +# ======== class DistributedPool(pool.Pool): def __init__(self, cluster, processes=None, initializer=None, initargs=()): @@ -148,13 +171,12 @@ def _help_stuff_finish(inqueue, task_handler, size): inqueue.set_contents([None] * size) -# -# Manager type which starts host managers on other machines -# - +# =================== +# = Cluster Manager = +# =================== def LocalProcess(**kwds): p = Process(**kwds) - p.set_name('localhost/' + p.name) + p.name = 'localhost/' + p.name return p class Cluster(managers.SyncManager): @@ -190,17 +212,14 @@ i, address, cpus = conn.recv() conn.close() other_host = self._hostlist[i] - other_host.manager = HostManager.from_address(address, - self._authkey) + other_host.manager = HostManager.from_address(address, self._authkey) other_host.slots = other_host.slots or cpus other_host.Process = other_host.manager.Process else: host.slots = host.slots or slot_count host.Process = LocalProcess - self._slotlist = [ - Slot(host) for host in self._hostlist for i in range(host.slots) - ] + self._slotlist = [ Slot(host) for host in self._hostlist for i in range(host.slots) ] self._slot_iterator = itertools.cycle(self._slotlist) self._base_shutdown = self.shutdown del self.shutdown @@ -213,11 +232,9 @@ def Process(self, group=None, target=None, name=None, args=(), kwargs={}): slot = self._slot_iterator.next() - return slot.Process( - group=group, target=target, name=name, args=args, kwargs=kwargs - ) + return slot.Process(group=group, target=target, name=name, args=args, kwargs=kwargs) - def Pool(self, processes=None, initializer=None, initargs=()): + def Pool( self, processes=None, initializer=None, initargs=() ): return DistributedPool(self, processes, initializer, initargs) def __getitem__(self, i): @@ -229,10 +246,10 @@ def __iter__(self): return iter(self._slotlist) -# -# Queue subclass used by distributed pool -# +# ===================== +# = Distributed Queue = +# ===================== class SettableQueue(Queue.Queue): def empty(self): return not self.queue @@ -251,19 +268,17 @@ Cluster.register('_SettableQueue', SettableQueue) -# -# Class representing a notional cpu in the cluster -# - +# ======== +# = Slot = +# ======== class Slot(object): def __init__(self, host): self.host = host self.Process = host.Process -# -# Host -# - +# ======== +# = Host = +# ======== class Host(object): ''' Represents a host to use as a node in a cluster. @@ -285,6 +300,7 @@ def _start_manager(self, index, authkey, address, files): if self.hostname != 'localhost': tempdir = copy_to_remote_temporary_directory(self.hostname, files) + debug('startup files copied to %s:%s', self.hostname, tempdir) p = subprocess.Popen( ['ssh', self.hostname, 'python', '-c', @@ -300,23 +316,19 @@ pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) p.stdin.close() -# -# Copy files to remote directory, returning name of directory -# - -unzip_code = '''" -import tempfile, os, sys, tarfile +# ======================================================================= +# = Copy files to remote directory and return the name of the directory = +# ======================================================================= +unzip_code = '''"import tempfile, os, sys, tarfile tempdir = tempfile.mkdtemp(prefix='distrib-') os.chdir(tempdir) tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') -for ti in tf: - tf.extract(ti) -print tempdir -"''' +tf.extractall() +print tempdir"''' def copy_to_remote_temporary_directory(host, files): p = subprocess.Popen( - ['ssh', host, 'python', '-c', unzip_code], + ['ssh', host, 'python', '-c', unzip_code.replace("\n", ';')], stdout=subprocess.PIPE, stdin=subprocess.PIPE ) tf = tarfile.open(fileobj=p.stdin, mode='w|gz') @@ -326,10 +338,9 @@ p.stdin.close() return p.stdout.read().rstrip() -# -# Code which runs a host manager -# - +# ====================================== +# = Code that runs on the remote node. = +# ====================================== def main(): # get data from parent over stdin data = pickle.load(sys.stdin) @@ -339,8 +350,10 @@ _logger.setLevel(data['dist_log_level']) forking.prepare(data) - # create server for a `HostManager` object - server = managers.Server(HostManager._registry, ('', 0), data['authkey']) + # create DistributedServer for a `HostManager` object + import platform + hostname = platform.node() + server = DistributedServer(HostManager._registry, (hostname, 0), data['authkey'], "pickle") current_process()._server = server # report server address and number of cpus back to parent @@ -349,7 +362,7 @@ conn.close() # set name etc - current_process().set_name('Host-%s:%s' % server.address) + current_process().name = 'Host-%s:%s' % server.address util._run_after_forkers() # register a cleanup function @@ -362,3 +375,4 @@ # start host manager debug('remote host manager starting in %s', data['dir']) server.serve_forever() + \ No newline at end of file