import random import time import os from multiprocessing import Process, Queue def worker(input, output): t = time.time() # Let my master know I'm alive. output.put(('START', os.getpid())) for item in iter(input.get, 'STOP'): # quick pause to allow other stuff to happen a bit randomly t = 0.1 * random.random() time.sleep(t) func, args = item output.put(func(*args)) # Let my master know I'm finished. output.put(('STOP', time.time()-t, os.getpid())) def mul(a,b): return a * b, os.getpid() def fac(n): sign = 1 if n >= 0 else -1 return (n, sign * _fac(abs(n)), os.getpid()) def _fac(n): if n == 0 or n == 1: return 1 return n * _fac(n-1) def test(): N = 4 task_queue = Queue() done_queue = Queue() # Here is the work we want the processes to do. tasks = [] for i in range(N*5): ## tasks.append((mul, (random.random(), random.random()))) tasks.append((fac, (random.randrange(0, 15),))) # Put N 'STOP' tokens on the queue to cause the kids to terminate. tasks.extend(('STOP',) * N) for t in tasks: task_queue.put(t) # Fire off the subprocesses and let them work. pids = set() procs = {} for i in range(N): p = Process( target=worker, args=(task_queue, done_queue)) p.start() procs[p.pid] = p # Read the expected output. reporting_pids = set() # These ones sent us a START token. nstarts = nstops = nresults = 0 while True: result = done_queue.get() if result[0] == 'START': nstarts += 1 reporting_pids.add(result[1]) elif result[0] == 'STOP': nstops += 1 else: nresults += 1 print nstarts, nstops, nresults, result if nstops == nstarts: break if nresults < N * 5: print "!!!! whoa dude! didn't see the correct number of results!" # Compare the processes created with the number who told us they # started and terminate any who never reported. for zombie_pid in set(procs) - reporting_pids: print "!!!! killing pid", zombie_pid procs[zombie_pid].terminate() if __name__ == '__main__': test()