Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(92672)

Unified Diff: Lib/queue.py

Issue 14976: queue.Queue() is not reentrant, so signals and GC can cause deadlocks
Patch Set: Created 1 year, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
index 572425e844..b7e05dadf4 100644
--- a/Lib/queue.py
+++ b/Lib/queue.py
@@ -23,6 +23,7 @@ class Queue:
If maxsize is <= 0, the queue size is infinite.
'''
+ _has_atomic_put = True
def __init__(self, maxsize=0):
self.maxsize = maxsize
@@ -32,7 +33,7 @@ class Queue:
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
- self.mutex = threading.Lock()
+ self.mutex = threading.RLock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
@@ -66,7 +67,8 @@ class Queue:
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
- self.all_tasks_done.notify_all()
+ if not self._qsize():
+ self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
@@ -79,7 +81,7 @@ class Queue:
When the count of unfinished tasks drops to zero, join() unblocks.
'''
with self.all_tasks_done:
- while self.unfinished_tasks:
+ while self.unfinished_tasks or self._qsize():
self.all_tasks_done.wait()
def qsize(self):
@@ -123,8 +125,10 @@ class Queue:
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
- with self.not_full:
- if self.maxsize > 0:
+ if self.maxsize <= 0:
+ self._put_unbounded(item)
+ else:
+ with self.not_full:
if not block:
if self._qsize() >= self.maxsize:
raise Full
@@ -140,9 +144,21 @@ class Queue:
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
+
+ self._put(item)
+ self.not_empty.notify()
+
+ def _put_unbounded(self, item):
+ reentrant_call = self.mutex._is_owned()
+ if reentrant_call:
+ if not self._has_atomic_put:
+ raise RuntimeError("reentrant put() call on %s"
+ % (self.__class__.__name__,))
self._put(item)
- self.unfinished_tasks += 1
- self.not_empty.notify()
+ else:
+ with self.mutex:
+ self._put(item)
+ self.not_empty.notify()
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
@@ -172,6 +188,7 @@ class Queue:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
+ self.unfinished_tasks += 1
self.not_full.notify()
return item
@@ -216,6 +233,7 @@ class PriorityQueue(Queue):
Entries are typically tuples of the form: (priority number, data).
'''
+ _has_atomic_put = False
def _init(self, maxsize):
self.queue = []
« no previous file with comments | « no previous file | no next file » | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+