import time import logging from multiprocessing.util import log_to_stderr import multiprocessing log_to_stderr(logging.DEBUG) FT_INDEX = 0 HEADER = ("12345", "23456", "234567") LENHEADER = None TRAILER = ("1234T", "2345T", "3456T") LENTRAILER = None BODY = '1111' * 1000000 LENBODY = len(BODY) LENHEADER = len(HEADER[FT_INDEX]) LENTRAILER = len(TRAILER[FT_INDEX]) LENREQ = LENHEADER + LENBODY + LENTRAILER def Initialize(INDEX): "Initialize generation of content of return from worker process" global FT_INDEX, LENREQ, LENHEADER, LENTRAILER FT_INDEX = INDEX LENHEADER = len(HEADER[FT_INDEX]) LENTRAILER = len(TRAILER[FT_INDEX]) LENREQ = LENHEADER + LENBODY + LENTRAILER def ValidateData(data): "Validate contents of return from worker process" str1 = data[0:LENHEADER] data1 = str1 == HEADER[FT_INDEX] str2 = data[LENREQ-LENTRAILER: LENREQ] data2 = str2 == TRAILER[FT_INDEX] str3 = data[LENHEADER:LENHEADER+LENBODY] data3 = str3 == BODY return data1 and data2 and data3 def f(t): "The function to be executed in the worker process" time.sleep(t - time.time()) return HEADER[FT_INDEX] + BODY + TRAILER[FT_INDEX] if __name__ == '__main__': "Testing code only executable when invoked as main" Initialize(1) with multiprocessing.Pool(1) as p: req = None t = time.time() + 5 print("If no stdout message after this message we hung in apply_async") # Invoking worker process req = p.apply_async(f, (t + .03,)) # See if the worker process has completed if not req.ready(): print("Expected early termination of apply_async. Request output" " not available") # Back from invocation of worker process try: # Getting result from worker process and incidentally synchronizing # correctly on pipe underlying queue in case that amount of result # data exceeds capacity of pipe. data = req.get(timeout=10) print("Worker process terminated OK without exceptions or timeout") except Exception as ex: print("Worker process terminated with exception {}".format(ex)) if not (len(data)) == LENREQ: print("Invalid data length from worker process is {}," " ought to be {}".format(len(data), LENREQ)) if not ValidateData(data): print("apply_async completed with invalid data from" " worker process") else: print("apply_async completed with valid result from" " worker process") Initialize(2) # We are now in main after execution of worker process print("If no stdout message after this message we hung in main after" " standalone invocation of f()") data3 = f(t + 10) if len(data3) != LENREQ: print ("Length of data from standalone invocation of f() is incorrect," " should be {} is {}".format(LENREQ, len(data3))) if ValidateData(data3): print("We are exiting without error or lock from main after" " standalone invocation of f()") else: print("We did not get correct data from f() in main after" " standalone invocation of f()")