I am building an application that is made up of several separate processes, where each process is a python program. They are all started by the supervisord utility and execute within a venv running Python 3.8.5 (default, Aug 13 2020, 15:42:06) [GCC 7.5.0] on linux, under Ubuntu 18.04.
I am using a multiprocessing BaseManager to implement a repository of queues. Each process asks for a queue by name then uses put/get on that queue.
The application needs to be resilient so it must be possible to restart the respository process and have the various client processes re-connect to the queues hosted by it.
The problem I am getting is that the first call to `get_queue()` after restarting the BaseManager server process does not return a queue.
The sequence below shows some testing by hand. (My test environment runs Ubuntu in a virtualbox hosted on Windows 8.1)
Here I started the server in a different terminal then started python as below (both pythons in the same venv).
This works as expected with the first call to get_queue returning a queue.
```
(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python
Python 3.8.5 (default, Aug 13 2020, 15:42:06)
[GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue')
>>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7f98403c1820>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7f98403c1820>
```
Stop and restart the server to see the problem. The first call to get_queue seems to succeed but in fact it has failed as shown by the print(str...). The second call to get_queue succeeds.
```
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc160; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7ff5f1b46820>
```
The server logs show it sent queues on all 4 calls
```
^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py
starting
serving <multiprocessing.queues.Queue object at 0x7f98403c1820>
serving <multiprocessing.queues.Queue object at 0x7f98403c1820>
^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py
starting
serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820>
serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820>
```
I get the same behaviour if I re-instantiate the local manager object
```
>>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc2b0; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7ff8dabd7820>
>>>
```
I even get the same behaviour if I just call `get_queue()` after restarting the server (ie without explicitly reconnecting).
I would have expected the first call to `get_queue()` to return a valid queue since neither it nor the call to `connect()` raised any kind of error.
It seems to me that there is some kind of state held that is the underlying cause of the issue. I did some investigating in but I was not able to work out what was happening.
I found that it was possible to get into a state where a valid queue was never returned by `get_queue()` if an error had been raised by `get_nowait()` first.
Stop the server
```
>>> q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<string>", line 2, in get_nowait
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 835, in _callmethod
kind, result = conn.recv()
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
```
Restart the server but do not call `get_queue()`
```
>>> q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<string>", line 2, in get_nowait
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63070; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63130; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63220; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f631f0; '__str__()' failed>
```
This continued while I was testing, but returned a queue some time later so was perhaps stuck on a timeout.
Python 3.9.0 - I did a limited amount of testing on Python 3.9.0 and the results appeared to be the same.
The code for the test server is here
```
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
class QueueManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__queues = {} # dict of instances keyed on qname
def get_queue(self, qname: str, src: str) -> Queue:
if qname not in self.__queues:
self.__queues[qname] = Queue()
the_q = self.__queues[qname]
print(f'serving {the_q}')
return the_q
def main() -> None:
"""main for a process serving queues forever"""
mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
QueueManager.register('get_queue', callable=mgr.get_queue)
server_object = mgr.get_server()
print('starting')
server_object.serve_forever()
if __name__ == '__main__':
main()
``` |