import numpy as np import os from multiprocessing import shared_memory, Pool from time import sleep def worker1(id, points): existing_shm = shared_memory.SharedMemory(id) c = np.ndarray((points,), dtype=np.int64, buffer=existing_shm.buf) d = c.sum() return ("worker1", d) def worker2(id, points): existing_shm = shared_memory.SharedMemory(id) c = np.ndarray((points,), dtype=np.int64, buffer=existing_shm.buf) d = c.mean() return ("worker2", d) if __name__ == '__main__': # initialize process pool before initializing shared_memory -> resource_tracker throws warning(s) # this is my regular sequence process_pool = Pool(processes=os.cpu_count()) # either use this line or the one 11 lines below # initialize shared_memory a = np.array([x for x in range(10)]) shm = shared_memory.SharedMemory(create=True, size=a.nbytes) b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) b[:] = a[:] # initialize process pool after initializing shared_memory -> no resource_tracker warning # could potentially not always be implemented this way # process_pool = Pool(processes=os.cpu_count()) # either use this line or the one 11 lines above results = [] results.append(process_pool.apply_async(worker1, args=(shm.name, a.shape[0]))) results.append(process_pool.apply_async(worker2, args=(shm.name, a.shape[0]))) num_results = 2 while num_results: for i in range(num_results): if results[i].ready(): result = results[i].get() print(result) del results[i] num_results -= 1 break else: sleep(0.001) # wait 1 ms process_pool.close() process_pool.join() # The following behavior is in contrast to the documentation which states: # Note: the last process relinquishing its hold on a shared memory block may call unlink() and close() in either order. shm.close() # throws segmentation fault when used shm.unlink()