Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a BrokenPipeError when a multiprocessing.Queue is garbage collected #91185

Closed
maggyero mannequin opened this issue Mar 15, 2022 · 12 comments
Closed

Fix a BrokenPipeError when a multiprocessing.Queue is garbage collected #91185

maggyero mannequin opened this issue Mar 15, 2022 · 12 comments
Labels
3.9 only security fixes 3.10 only security fixes 3.11 only security fixes stdlib Python modules in the Lib dir type-crash A hard crash of the interpreter, possibly with a core dump

Comments

@maggyero
Copy link
Mannequin

maggyero mannequin commented Mar 15, 2022

BPO 47029
Nosy @maggyero
PRs
  • bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close #31913
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = None
    created_at = <Date 2022-03-15.18:28:33.219>
    labels = ['3.10', 'library', '3.9', 'type-crash', '3.11']
    title = 'Fix a BrokenPipeError when a multiprocessing.Queue is garbage collected'
    updated_at = <Date 2022-03-16.16:27:25.124>
    user = 'https://github.com/maggyero'

    bugs.python.org fields:

    activity = <Date 2022-03-16.16:27:25.124>
    actor = 'maggyero'
    assignee = 'none'
    closed = False
    closed_date = None
    closer = None
    components = ['Library (Lib)']
    creation = <Date 2022-03-15.18:28:33.219>
    creator = 'maggyero'
    dependencies = []
    files = []
    hgrepos = []
    issue_num = 47029
    keywords = ['patch']
    message_count = 3.0
    messages = ['415272', '415273', '415351']
    nosy_count = 1.0
    nosy_names = ['maggyero']
    pr_nums = ['31913']
    priority = 'normal'
    resolution = None
    stage = 'patch review'
    status = 'open'
    superseder = None
    type = 'crash'
    url = 'https://bugs.python.org/issue47029'
    versions = ['Python 3.9', 'Python 3.10', 'Python 3.11']

    @maggyero
    Copy link
    Mannequin Author

    maggyero mannequin commented Mar 15, 2022

    A BrokenPipeError exception is raised when the queue thread of a multiprocessing.Queue still sends enqueued items to the write end of the queue pipe after the read end of the queue pipe has been automatically closed during its garbage collection following the garbage collection of the queue (the write end of the queue pipe is not garbage collected because it is also referenced by the queue thread):

    import multiprocessing
    
    def main():
        q = multiprocessing.Queue()
        q.put(0)
    
    if __name__ == '__main__':
        main()
    

    @maggyero maggyero mannequin added 3.9 only security fixes 3.10 only security fixes 3.11 only security fixes stdlib Python modules in the Lib dir type-crash A hard crash of the interpreter, possibly with a core dump labels Mar 15, 2022
    @maggyero
    Copy link
    Mannequin Author

    maggyero mannequin commented Mar 15, 2022

    I have attached the following patch: pass a reference to the reader end of the queue pipe to the queue thread so that the reader end is not garbage collected and closed before the queue thread has sent all the buffered data to the writer end.

    @maggyero
    Copy link
    Mannequin Author

    maggyero mannequin commented Mar 16, 2022

    I forgot to include the output of the above program:

    Traceback (most recent call last):
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
        send_bytes(obj)
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes 
        self._send_bytes(m[offset:offset + size])
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes 
        self._send(header + buf)
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
        n = write(self._handle, buf)
    BrokenPipeError: [Errno 32] Broken pipe
    

    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    @malsyned
    Copy link

    malsyned commented Apr 13, 2022

    I am seeing this same issue and it's causing my unit tests to print a bunch of spurious BrokenPipeError chatter. It's subject to race conditions, and the sample code in #91185 (comment) only manifests the issue a fraction of the time on my development machine. If I want to reliably reproduce the BrokenPipeError, this main() does it:

    def main():
        q = multiprocessing.Queue()
        q.put(0)
        #time.sleep(0.001) # Issue goes away when this line is uncommented
        q.close()
        q.join_thread()

    If I uncomment 1ms delay, the BrokenPipe error goes away.

    @malsyned
    Copy link

    The PR from @maggyero didn't fix this issue for me with my modified main().

    I believe the root cause of this bug is that Queue.close() closes self._reader, but that may be premature if the current process is the only one holding a reference to that reader and there are still objects in self._buffer that the _feed thread hasn't flushed out to self._writer.

    I wouldn't consider this a real fix since the reader is surely being closed explicitly for a reason (and because this is ugly code), but as a proof of concept, this change eliminates the BrokenPipeError reliably for me:

    diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
    index ff3747ec49..59cda8af52 100644
    --- a/Lib/multiprocessing/queues.py
    +++ b/Lib/multiprocessing/queues.py
    @@ -140,7 +140,7 @@ def put_nowait(self, obj):
         def close(self):
             self._closed = True
             try:
    -            self._reader.close()
    +            pass
             finally:
                 close = self._close
                 if close:

    @malsyned
    Copy link

    malsyned commented Apr 13, 2022

    Some further evidence: putting this code before q.close() also stops my main() from reproducing this issue:

        while q._buffer:
            pass

    [edit: mostly stops. There's an additional race condition that happens very rarely as well]

    @malsyned
    Copy link

    malsyned commented Apr 14, 2022

    Is this a dupe of #80025?

    @geryogam
    Copy link
    Contributor

    geryogam commented Apr 26, 2022

    I tried this pull request and didn't fix the problem for me. Might you be getting tricked by how racy this issue is into thinking that an unrelated change is resolving it?

    Thanks for the code sample @malsyned. I have updated the PR and it fixes your issue too. The problem in your case was that Queue.close explicitly closed the read end self._reader of the queue pipe from the main thread before notifying the queue thread to stop flushing the buffer self._buffer to the write end self._writer of the queue pipe. So I solved this by closing the read end self._reader of the queue pipe from the queue thread instead.

    From the Linux manual page of pipe:

    If all file descriptors referring to the write end of a pipe have been closed, then an attempt to read(2) from the pipe will see end-of-file (read(2) will return 0). If all file descriptors referring to the read end of a pipe have been closed, then a write(2) will cause a SIGPIPE signal to be generated for the calling process. If the calling process is ignoring this signal, then write(2) fails with the error EPIPE. An application that uses pipe(2) and fork(2) should use suitable close(2) calls to close unnecessary duplicate file descriptors; this ensures that end-of-file and SIGPIPE/EPIPE are delivered when appropriate.

    @geryogam
    Copy link
    Contributor

    Hi @JelleZijlstra. Could you close this issue now that it has been fixed in PR #31913? (I cannot do it from my account.)

    @geryogam
    Copy link
    Contributor

    @JelleZijlstra Thanks!

    @gatopeich
    Copy link

    I am hitting a similar issue in Python 3.8 when using mp.Queues from asyncio.run-in_executor.
    Is there a workaround or fix or I have to upgrade Python?

    @geryogam
    Copy link
    Contributor

    @gatopeich Could you open a separate issue with a minimal working example to reproduce the bug?

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    3.9 only security fixes 3.10 only security fixes 3.11 only security fixes stdlib Python modules in the Lib dir type-crash A hard crash of the interpreter, possibly with a core dump
    Projects
    None yet
    Development

    No branches or pull requests

    4 participants