from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy, Binary, Marshaller, Unmarshaller, Fault
import threading
class MyMarshaller(Marshaller):
def __init__(self, *args, **kwds):
Marshaller.__init__(self, *args, **kwds)
__dispatch = {}
def dump_bytes(self, value, write):
self.dump_instance(Binary(value), write)
__dispatch[bytes] = dump_bytes
def dump_tuple(self, value, write):
self._add_memo(value)
write("\n")
for v in value:
self._dump(v, write)
write("\n")
self._del_memo(value)
__dispatch[tuple] = dump_tuple
def dump_struct(self, value, write):
self._add_memo(value)
write("\n")
for k, v in value.items():
write("\n")
self._dump(k, write)
self._dump(v, write)
write("\n")
write("\n")
self._del_memo(value)
__dispatch[dict] = dump_struct
def _dump(self, value, write):
if type(value) not in self.__dispatch:
return Marshaller._dump(self, value, write)
else:
self.__dispatch[type(value)](self, value, write)
class MyUnmarshaller(Unmarshaller):
def __init__(self, *args, **kwds):
Unmarshaller.__init__(self, *args, **kwds)
__dispatch = {}
def start(self, tag, attrs):
if tag == "tuple":
self._marks.append(len(self._stack))
self._data = []
self._value = False
else:
Unmarshaller.start(self, tag, attrs)
def end_base64(self, data):
value = Binary()
value.decode(data.encode("ascii"))
self._stack.append(value.data)
self._value = 0
__dispatch["base64"] = end_base64
def end_tuple(self, data):
mark = self._marks.pop()
# map tuples to Python tuples
self._stack[mark:] = [tuple(self._stack[mark:])]
self._value = 0
__dispatch["tuple"] = end_tuple
def end(self, tag):
if tag in self.__dispatch:
return self.__dispatch[tag](self, "".join(self._data))
else:
return Unmarshaller.end(self, tag)
def end_dispatch(self, tag, data):
if tag in self.__dispatch:
return self.__dispatch[tag](self, data)
else:
return Unmarshaller.end_dispatch(self, tag, data)
class MyServerProxy(ServerProxy):
def __init__(self, *args, **kwds):
kwds['marshaller'] = MyMarshaller
kwds['unmarshaller'] = MyUnmarshaller
kwds['allow_none'] = True
ServerProxy.__init__(self, *args, **kwds)
class MyXMLRPCServer(SimpleXMLRPCServer):
"""Wrapper for XML RPC server, used for DB"""
def __init__(self, *args, **kwds):
kwds['allow_none'] = True
kwds['unmarshaller'] = MyUnmarshaller
kwds['marshaller'] = MyMarshaller
SimpleXMLRPCServer.__init__(self, *args, **kwds)
class TestInstance:
def test_tuple(self, val):
print("test_tuple() got: " + repr(val))
return (1, 2)
def test_bytes(self, val):
print("test_bytes() got: " + repr(val))
return b"1234"
def test_dict(self, val):
print("test_dict() got: " + repr(val))
return {1: "a", 2: "b"}
class TestServer:
"""Class for brain DB server"""
def __init__(self, port=8000):
self._server = MyXMLRPCServer(("localhost", port))
self._server.register_instance(TestInstance())
self._server_thread = threading.Thread(target=self._server.serve_forever)
def start(self):
"""Start server"""
self._server_thread.start()
def stop(self):
"""Stop server and wait for it to finish"""
self._server.shutdown()
self._server_thread.join()
if __name__ == '__main__':
server = TestServer()
server.start()
try:
client = MyServerProxy('http://localhost:8000')
print(client.test_tuple((3, 4)))
print(client.test_bytes(b"5678"))
print(client.test_dict({4.5: "bcd"}))
finally:
server.stop()