# based on Brian Quinlan's patch: https://sourceforge.net/tracker/?func=detail&atid=305470&aid=531629&group_id=5470 # this release is Copyright 2004 IMVU, Inc # written by Eric Ries (eric@imvu.com) # released under the Python License: http://www.python.org/2.3.2/license.html from __future__ import generators import sys, re, string, time, operator import xmlrpclib from types import * import threading from log.StdLogger import logger class _MultiCallMethod: # some lesser magic to store calls made to a MultiCall object # for batch execution def __init__(self, call_list, name): self.__call_list = call_list self.__name = name def __getattr__(self, name): return _MultiCallMethod(self.__call_list, "%s.%s" % (self.__name, name)) def __call__(self, *args): self.__call_list.append((self.__name, args)) def MultiCallIterator(results): """Iterates over the results of a multicall. Exceptions are thrown in response to xmlrpc faults.""" for i in results: if type(i) == type({}): raise xmlrpclib.Fault(i['faultCode'], i['faultString']) elif type(i) == type([]): yield i[0] else: raise ValueError,\ "unexpected type in multicall result" class MultiCall: """server -> a object used to boxcar method calls server should be a ServerProxy object. Methods can be added to the MultiCall using normal method call syntax e.g.: multicall = MultiCall(server_proxy) multicall.add(2,3) multicall.get_address("Guido") To execute the multicall, call the MultiCall object e.g.: add_result, address = multicall() """ def __init__(self, server, useIterator=False): self.__server = server self.__call_list = [] self.useIterator_ = useIterator def __repr__(self): return "" % id(self) __str__ = __repr__ def __getattr__(self, name): return _MultiCallMethod(self.__call_list, name) def __call__(self): marshalled_list = [] for name, args in self.__call_list: marshalled_list.append({'methodName' : name, 'params' : args}) if self.useIterator_: return MultiCallIterator(self.__server.system.multicall(marshalled_list)) else: return self.__server.system.multicall(marshalled_list) class _ThreadedMultiCallMethod: def __init__(self, multiCall, name): self.multiCall_ = multiCall self.name_ = name self.lock_ = threading.Lock() def __getattr__(self, name): return _ThreadedMultiCallMethod(multiCall=self.multiCall_, name="%s.%s" % (self.name_, name)) def __call__(self, *args): info = {"name":self.name_, "args":args, "lock":self.lock_} self.multiCall_.addCall(info) if self.multiCall_.dontBlock_: return None self.lock_.acquire() try: t, e = info["error"] if e: raise e else: raise t(e) except KeyError: return info["result"] class ThreadedMultiCall: def __init__(self, server, interval=1.0, maxCalls=25): self.server_ = server self.currentMulticall_ = None self.calls_ = [] self.lock_ = threading.Lock() self.batchIntervalSeconds_ = interval self.dontBlock_ = False self.callback_ = None self.maximumCallCount_ = maxCalls self.threadCount_ = 0 def beginNonBlockingUpdates(self, callback): self.dontBlock_ = True self.callback_ = callback def endNonBlockingUpdates(self): assert self.callback_ assert self.dontBlock_ assert self.currentMulticall_ is None assert self.calls_ calls = self.calls_ self.setupNewMulticall(useThread=False) for info in calls: try: result = info["result"] except KeyError: continue else: self.callback_(result) self.dontBlock_ = False self.callback_ = None def setupNewMulticall(self, useThread=True): #print "setupNewMulticall" assert self.currentMulticall_ is None if useThread: assert not self.lock_.acquire(0) self.currentMulticall_ = MultiCall(server=self.server_) if useThread: newthread = threading.Thread(target = self.__multiCallWorker, name = 'multiCall %s' % self.threadCount_, args = []) self.threadCount_ += 1 newthread.start() else: self.__multiCallWorker() if useThread: assert self.currentMulticall_ is not None #print "setupNewMulticall done" def __multiCallWorker(self): try: return self.__multiCallWorkerInternal() except: logger.exception("error in __multiCallWorkerInternal") sys.exc_clear() def __multiCallWorkerInternal(self): #print "acquire1" self.lock_.acquire() self.lock_.release() logger.info("__multiCallWorkerInternal sleep %s" % threading.currentThread()) while 1: lenCalls = len(self.calls_) time.sleep(self.batchIntervalSeconds_) if len(self.calls_) >= self.maximumCallCount_: break if lenCalls == len(self.calls_): break #print "acquire2" self.lock_.acquire() #print "acquire2 done" try: calls = self.calls_ mc = self.currentMulticall_ self.currentMulticall_ = None self.calls_ = [] finally: self.lock_.release() logger.info("ThreadedMultiCall batching up %s calls (thread %s)" % (len(calls), threading.currentThread())) if len(calls) == 1: info = calls[0] method = getattr(self.server_, info["name"]) try: info["result"] = apply(method, info["args"]) except: t, e = sys.exc_info()[:2] info["error"] = (t,e) info["lock"].release() return for info in calls: method = getattr(mc, info["name"]) apply(method, info["args"]) #print "Get iter" try: resultArray = mc() except: t, e = sys.exc_info()[:2] for info in calls: info["error"] = (t,e) info["lock"].release() return i = 0 for info in calls: #print "next iter" result = resultArray[i] if type(result) == type({}): info["error"] = (xmlrpclib.Fault, xmlrpclib.Fault(result['faultCode'], result['faultString'])) elif type(result) == type([]): info["result"] = result[0] else: info["error"] = (ValueError, ValueError("unexpected type in multicall result")) info["lock"].release() i += 1 def addCall(self, info): #print "addCall" self.lock_.acquire() try: info["lock"].acquire() if self.dontBlock_: pass elif self.currentMulticall_ is None: self.setupNewMulticall() #print "setupNewMulticall returned, back on main thread %s" % threading.currentThread() self.calls_.append(info) finally: self.lock_.release() #print "addCall done" def __getattr__(self, name): return _ThreadedMultiCallMethod(self, name)