diff --git a/Lib/queue.py b/Lib/queue.py index 572425e844..b7e05dadf4 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -23,6 +23,7 @@ class Queue: If maxsize is <= 0, the queue size is infinite. ''' + _has_atomic_put = True def __init__(self, maxsize=0): self.maxsize = maxsize @@ -32,7 +33,7 @@ class Queue: # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. - self.mutex = threading.Lock() + self.mutex = threading.RLock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. @@ -66,7 +67,8 @@ class Queue: if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') - self.all_tasks_done.notify_all() + if not self._qsize(): + self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished def join(self): @@ -79,7 +81,7 @@ class Queue: When the count of unfinished tasks drops to zero, join() unblocks. ''' with self.all_tasks_done: - while self.unfinished_tasks: + while self.unfinished_tasks or self._qsize(): self.all_tasks_done.wait() def qsize(self): @@ -123,8 +125,10 @@ class Queue: is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' - with self.not_full: - if self.maxsize > 0: + if self.maxsize <= 0: + self._put_unbounded(item) + else: + with self.not_full: if not block: if self._qsize() >= self.maxsize: raise Full @@ -140,9 +144,21 @@ class Queue: if remaining <= 0.0: raise Full self.not_full.wait(remaining) + + self._put(item) + self.not_empty.notify() + + def _put_unbounded(self, item): + reentrant_call = self.mutex._is_owned() + if reentrant_call: + if not self._has_atomic_put: + raise RuntimeError("reentrant put() call on %s" + % (self.__class__.__name__,)) self._put(item) - self.unfinished_tasks += 1 - self.not_empty.notify() + else: + with self.mutex: + self._put(item) + self.not_empty.notify() def get(self, block=True, timeout=None): '''Remove and return an item from the queue. @@ -172,6 +188,7 @@ class Queue: raise Empty self.not_empty.wait(remaining) item = self._get() + self.unfinished_tasks += 1 self.not_full.notify() return item @@ -216,6 +233,7 @@ class PriorityQueue(Queue): Entries are typically tuples of the form: (priority number, data). ''' + _has_atomic_put = False def _init(self, maxsize): self.queue = []