This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author jryan
Recipients jryan
Date 2020-10-26.09:17:37
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1603703858.08.0.528057325979.issue42154@roundup.psfhosted.org>
In-reply-to
Content
I am building an application that is made up of several separate processes, where each process is a python program. They are all started by the supervisord utility and execute within a venv running Python 3.8.5 (default, Aug 13 2020, 15:42:06) [GCC 7.5.0] on linux, under Ubuntu 18.04.

I am using a multiprocessing BaseManager to implement a repository of queues. Each process asks for a queue by name then uses put/get on that queue.

The application needs to be resilient so it must be possible to restart the respository process and have the various client processes re-connect to the queues hosted by it.

The problem I am getting is that the first call to `get_queue()` after restarting the BaseManager server process does not return a queue.

The sequence below shows some testing by hand. (My test environment runs Ubuntu in a virtualbox hosted on Windows 8.1)

Here I started the server in a different terminal then started python as below (both pythons in the same venv).

This works as expected with the first call to get_queue returning a queue.
```
(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python
Python 3.8.5 (default, Aug 13 2020, 15:42:06) 
[GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
... 
>>> QueueManager.register('get_queue')
>>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7f98403c1820>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7f98403c1820>
```

Stop and restart the server to see the problem. The first call to get_queue seems to succeed but in fact it has failed as shown by the print(str...). The second call to get_queue succeeds.
```
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc160; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7ff5f1b46820>
```

The server logs show it sent queues on all 4 calls
```
^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py 
starting
serving <multiprocessing.queues.Queue object at 0x7f98403c1820>
serving <multiprocessing.queues.Queue object at 0x7f98403c1820>
^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py 
starting
serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820>
serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820>
```

I get the same behaviour if I re-instantiate the local manager object

```
>>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
>>> mgr.connect()
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc2b0; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<multiprocessing.queues.Queue object at 0x7ff8dabd7820>
>>>
```

I even get the same behaviour if I just call `get_queue()` after restarting the server (ie without explicitly reconnecting).

I would have expected the first call to `get_queue()` to return a valid queue since neither it nor the call to `connect()` raised any kind of error.

It seems to me that there is some kind of state held that is the underlying cause of the issue. I did some investigating in  but I was not able to work out what was happening.

I found that it was possible to get into a state where a valid queue was never returned by `get_queue()` if an error had been raised by `get_nowait()` first.

Stop the server

```
>>> q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 2, in get_nowait
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 835, in _callmethod
    kind, result = conn.recv()
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
```

Restart the server but do not call `get_queue()`

```
>>> q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 2, in get_nowait
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63070; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63130; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63220; '__str__()' failed>
>>> q = mgr.get_queue('name', 'src'); print(str(q))
<AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f631f0; '__str__()' failed>
```

This continued while I was testing, but returned a queue some time later so was perhaps stuck on a timeout.


Python 3.9.0 - I did a limited amount of testing on Python 3.9.0 and the results appeared to be the same.


The code for the test server is here

```
from multiprocessing.managers import BaseManager
from multiprocessing import Queue

class QueueManager(BaseManager):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__queues = {} # dict of instances keyed on qname

    def get_queue(self, qname: str, src: str) -> Queue:
        if qname not in self.__queues:
            self.__queues[qname] = Queue()
        the_q = self.__queues[qname]
        print(f'serving {the_q}')
        return the_q


def main() -> None:
    """main for a process serving queues forever"""
    mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' )
    QueueManager.register('get_queue', callable=mgr.get_queue)
    server_object = mgr.get_server()
    print('starting')
    server_object.serve_forever()


if __name__ == '__main__':
    main()

```
History
Date User Action Args
2020-10-26 09:17:38jryansetrecipients: + jryan
2020-10-26 09:17:38jryansetmessageid: <1603703858.08.0.528057325979.issue42154@roundup.psfhosted.org>
2020-10-26 09:17:38jryanlinkissue42154 messages
2020-10-26 09:17:37jryancreate