from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy, Binary, Marshaller, Unmarshaller, Fault import threading class MyMarshaller(Marshaller): def __init__(self, *args, **kwds): Marshaller.__init__(self, *args, **kwds) __dispatch = {} def dump_bytes(self, value, write): self.dump_instance(Binary(value), write) __dispatch[bytes] = dump_bytes def dump_tuple(self, value, write): self._add_memo(value) write("\n") for v in value: self._dump(v, write) write("\n") self._del_memo(value) __dispatch[tuple] = dump_tuple def dump_struct(self, value, write): self._add_memo(value) write("\n") for k, v in value.items(): write("\n") self._dump(k, write) self._dump(v, write) write("\n") write("\n") self._del_memo(value) __dispatch[dict] = dump_struct def _dump(self, value, write): if type(value) not in self.__dispatch: return Marshaller._dump(self, value, write) else: self.__dispatch[type(value)](self, value, write) class MyUnmarshaller(Unmarshaller): def __init__(self, *args, **kwds): Unmarshaller.__init__(self, *args, **kwds) __dispatch = {} def start(self, tag, attrs): if tag == "tuple": self._marks.append(len(self._stack)) self._data = [] self._value = False else: Unmarshaller.start(self, tag, attrs) def end_base64(self, data): value = Binary() value.decode(data.encode("ascii")) self._stack.append(value.data) self._value = 0 __dispatch["base64"] = end_base64 def end_tuple(self, data): mark = self._marks.pop() # map tuples to Python tuples self._stack[mark:] = [tuple(self._stack[mark:])] self._value = 0 __dispatch["tuple"] = end_tuple def end(self, tag): if tag in self.__dispatch: return self.__dispatch[tag](self, "".join(self._data)) else: return Unmarshaller.end(self, tag) def end_dispatch(self, tag, data): if tag in self.__dispatch: return self.__dispatch[tag](self, data) else: return Unmarshaller.end_dispatch(self, tag, data) class MyServerProxy(ServerProxy): def __init__(self, *args, **kwds): kwds['marshaller'] = MyMarshaller kwds['unmarshaller'] = MyUnmarshaller kwds['allow_none'] = True ServerProxy.__init__(self, *args, **kwds) class MyXMLRPCServer(SimpleXMLRPCServer): """Wrapper for XML RPC server, used for DB""" def __init__(self, *args, **kwds): kwds['allow_none'] = True kwds['unmarshaller'] = MyUnmarshaller kwds['marshaller'] = MyMarshaller SimpleXMLRPCServer.__init__(self, *args, **kwds) class TestInstance: def test_tuple(self, val): print("test_tuple() got: " + repr(val)) return (1, 2) def test_bytes(self, val): print("test_bytes() got: " + repr(val)) return b"1234" def test_dict(self, val): print("test_dict() got: " + repr(val)) return {1: "a", 2: "b"} class TestServer: """Class for brain DB server""" def __init__(self, port=8000): self._server = MyXMLRPCServer(("localhost", port)) self._server.register_instance(TestInstance()) self._server_thread = threading.Thread(target=self._server.serve_forever) def start(self): """Start server""" self._server_thread.start() def stop(self): """Stop server and wait for it to finish""" self._server.shutdown() self._server_thread.join() if __name__ == '__main__': server = TestServer() server.start() try: client = MyServerProxy('http://localhost:8000') print(client.test_tuple((3, 4))) print(client.test_bytes(b"5678")) print(client.test_dict({4.5: "bcd"})) finally: server.stop()