#!/opt/IBpython/3.1.1/bin/python3 #!/opt/IB/python/2.6.3/bin/python from multiprocessing import Process, Pipe, Queue import threading import time import copy class failureObject(): def __init__(self, ignoreFailure=False): self.__failure = False self.__failureString = "" self.__lock = threading.Lock() self.__ignoreFailure = ignoreFailure def setFailure(self, failString): if not self.__ignoreFailure: self.__lock.acquire() self.__failure = True self.__failureString = failString self.__lock.release() def hasFailed(self): return self.__failure def status(self): if not self.__failure: return True else: return self.__failureString def popJobs(myqueue): qlst = myqueue.get() qthreads = [] fail=failureObject() for x in qlst: t = threading.Thread(target=checkAlive, args = (fail, x), name=x) print("Starting", x) t.start() qthreads.append(t) while qthreads: for t in copy.copy(qthreads): if not t.is_alive(): print("Finishing:", t.name) t.join() qthreads.remove(t) time.sleep(0.5) print(fail.status()) def main(): # TODO config file etc etc # Sema only used by the children processes (if the master uses it, we risk deadlock due to # 1 child having 1 sema each, and all waiting for another to start another child. # Sema limits the number of children's children processes we can have. As its these that # do all the work, it doesn't really matter if we have a lot of direct children, as the # CPU usage of those is minimal. q = Queue() while True: q.put(["data1", "data2"]) t = Process(target=popJobs, args=(q, )) t.start() t.join() time.sleep(5) # Just to give the program a bit of added safety from a sick module # we will actually check/change the password in a subprocess, with # a sensible timeout. The subprocess will be killed if it takes too # long. def subProc(targetfunc, args, timeout=60): childRecv, childSend = Pipe(False) newJob = Process(target=targetfunc, args=(childSend,)) newJob.daemon=False newJob.start() print("Started subproc: PID: %d : args: %s" %(newJob.pid, str(args))) startTime = int(time.time()) childResult = None while startTime + timeout > int(time.time()): print("poll") if childRecv.poll(1): try: childResult = childRecv.recv() print("Child has sent:" + str(childResult)) continue except EOFError: print("EOF") break pass print("isalive: " + str(newJob.is_alive())) if not newJob.is_alive(): print("Reaping PID: " + str(newJob.pid)) newJob.join() break # Ok, something went wrong, its still alive, and we've passed timeout if newJob.is_alive(): print("terminating") newJob.terminate() newJob.join(3) if newJob.is_alive(): # die die die!! print("Sending sigkill to PID: " + str(newJob.pid)) newJob.kill() print("Child had to be killed due to a timeout") return "Module Timeout" newJob.join(3) if childResult: return childResult print("Child gave back no information") return "Internal failure" def checkAlive(fail, data): res = subProc(returnTrue, data) if not res: fail.setFailure("checkalive Failure") def returnTrue(myPipe): myPipe.send(True) return if __name__ == "__main__": main()