#Example with a basic custom object used as a counter shared between remote #processes Two methods are provided good() protects the critical section #whereas bad() does not. It shows the managers does not enforce the #thread safeness of the shared objects and methods. #(c) 2014 Irvin Probst ENSTA Bretagne #All rights reserved. from threading import Lock from multiprocessing import Pool from multiprocessing.managers import SyncManager import time class SharedData(): """Basic custom class to share some data between remote processes""" def __init__(self): self.__cpt=0 self.__mutex=Lock() def good(self): with self.__mutex: self.__increment() def bad(self): self.__increment() def __increment(self): tmp=self.__cpt #help the counter to fail miserably without lock time.sleep(1e-3) tmp+=1 self.__cpt=tmp def get_value(self): return self.__cpt def make_server(port, authkey): shared_data=SharedData() manager = SyncManager(address=('', port), authkey=authkey) manager.register("get_proxy",callable=lambda : shared_data) manager.start() return manager def make_client(ip, port, authkey): manager = SyncManager(address=(ip, port), authkey=authkey) manager.register("get_proxy") manager.connect() return manager def count_ok(): client=make_client('',6666, b"foo") data_proxy=client.get_proxy() for _ in range(1000): data_proxy.good() def count_not_ok(): client=make_client('',6666, b"foo") data_proxy=client.get_proxy() for _ in range(1000): data_proxy.bad() if __name__ == '__main__': server=make_server(6666, b"foo") counter_proxy=make_client('',6666, b"foo").get_proxy() #start a first pool calling count_ok pool = Pool() for _ in range(10): pool.apply_async(count_ok) pool.close() pool.join() print("Excected 10000 got", counter_proxy.get_value()) #start a second pool calling count_not_ok pool = Pool() for _ in range(10): pool.apply_async(count_not_ok) pool.close() pool.join() print("Excected 20000 got", counter_proxy.get_value())