Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(80797)

Side by Side Diff: Lib/queue.py

Issue 14976: queue.Queue() is not reentrant, so signals and GC can cause deadlocks
Patch Set: Created 2 years ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 '''A multi-producer, multi-consumer queue.''' 1 '''A multi-producer, multi-consumer queue.'''
2 2
3 try: 3 try:
4 import threading 4 import threading
5 except ImportError: 5 except ImportError:
6 import dummy_threading as threading 6 import dummy_threading as threading
7 from collections import deque 7 from collections import deque
8 from heapq import heappush, heappop 8 from heapq import heappush, heappop
9 from time import monotonic as time 9 from time import monotonic as time
10 10
11 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] 11 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
12 12
13 class Empty(Exception): 13 class Empty(Exception):
14 'Exception raised by Queue.get(block=0)/get_nowait().' 14 'Exception raised by Queue.get(block=0)/get_nowait().'
15 pass 15 pass
16 16
17 class Full(Exception): 17 class Full(Exception):
18 'Exception raised by Queue.put(block=0)/put_nowait().' 18 'Exception raised by Queue.put(block=0)/put_nowait().'
19 pass 19 pass
20 20
21 class Queue: 21 class Queue:
22 '''Create a queue object with a given maximum size. 22 '''Create a queue object with a given maximum size.
23 23
24 If maxsize is <= 0, the queue size is infinite. 24 If maxsize is <= 0, the queue size is infinite.
25 ''' 25 '''
26 _has_atomic_put = True
26 27
27 def __init__(self, maxsize=0): 28 def __init__(self, maxsize=0):
28 self.maxsize = maxsize 29 self.maxsize = maxsize
29 self._init(maxsize) 30 self._init(maxsize)
30 31
31 # mutex must be held whenever the queue is mutating. All methods 32 # mutex must be held whenever the queue is mutating. All methods
32 # that acquire mutex must release it before returning. mutex 33 # that acquire mutex must release it before returning. mutex
33 # is shared between the three conditions, so acquiring and 34 # is shared between the three conditions, so acquiring and
34 # releasing the conditions also acquires and releases mutex. 35 # releasing the conditions also acquires and releases mutex.
35 self.mutex = threading.Lock() 36 self.mutex = threading.RLock()
36 37
37 # Notify not_empty whenever an item is added to the queue; a 38 # Notify not_empty whenever an item is added to the queue; a
38 # thread waiting to get is notified then. 39 # thread waiting to get is notified then.
39 self.not_empty = threading.Condition(self.mutex) 40 self.not_empty = threading.Condition(self.mutex)
40 41
41 # Notify not_full whenever an item is removed from the queue; 42 # Notify not_full whenever an item is removed from the queue;
42 # a thread waiting to put is notified then. 43 # a thread waiting to put is notified then.
43 self.not_full = threading.Condition(self.mutex) 44 self.not_full = threading.Condition(self.mutex)
44 45
45 # Notify all_tasks_done whenever the number of unfinished tasks 46 # Notify all_tasks_done whenever the number of unfinished tasks
(...skipping 13 matching lines...) Expand all
59 for every item that had been put() into the queue). 60 for every item that had been put() into the queue).
60 61
61 Raises a ValueError if called more times than there were items 62 Raises a ValueError if called more times than there were items
62 placed in the queue. 63 placed in the queue.
63 ''' 64 '''
64 with self.all_tasks_done: 65 with self.all_tasks_done:
65 unfinished = self.unfinished_tasks - 1 66 unfinished = self.unfinished_tasks - 1
66 if unfinished <= 0: 67 if unfinished <= 0:
67 if unfinished < 0: 68 if unfinished < 0:
68 raise ValueError('task_done() called too many times') 69 raise ValueError('task_done() called too many times')
69 self.all_tasks_done.notify_all() 70 if not self._qsize():
71 self.all_tasks_done.notify_all()
70 self.unfinished_tasks = unfinished 72 self.unfinished_tasks = unfinished
71 73
72 def join(self): 74 def join(self):
73 '''Blocks until all items in the Queue have been gotten and processed. 75 '''Blocks until all items in the Queue have been gotten and processed.
74 76
75 The count of unfinished tasks goes up whenever an item is added to the 77 The count of unfinished tasks goes up whenever an item is added to the
76 queue. The count goes down whenever a consumer thread calls task_done() 78 queue. The count goes down whenever a consumer thread calls task_done()
77 to indicate the item was retrieved and all work on it is complete. 79 to indicate the item was retrieved and all work on it is complete.
78 80
79 When the count of unfinished tasks drops to zero, join() unblocks. 81 When the count of unfinished tasks drops to zero, join() unblocks.
80 ''' 82 '''
81 with self.all_tasks_done: 83 with self.all_tasks_done:
82 while self.unfinished_tasks: 84 while self.unfinished_tasks or self._qsize():
83 self.all_tasks_done.wait() 85 self.all_tasks_done.wait()
84 86
85 def qsize(self): 87 def qsize(self):
86 '''Return the approximate size of the queue (not reliable!).''' 88 '''Return the approximate size of the queue (not reliable!).'''
87 with self.mutex: 89 with self.mutex:
88 return self._qsize() 90 return self._qsize()
89 91
90 def empty(self): 92 def empty(self):
91 '''Return True if the queue is empty, False otherwise (not reliable!). 93 '''Return True if the queue is empty, False otherwise (not reliable!).
92 94
(...skipping 23 matching lines...) Expand all
116 '''Put an item into the queue. 118 '''Put an item into the queue.
117 119
118 If optional args 'block' is true and 'timeout' is None (the default), 120 If optional args 'block' is true and 'timeout' is None (the default),
119 block if necessary until a free slot is available. If 'timeout' is 121 block if necessary until a free slot is available. If 'timeout' is
120 a non-negative number, it blocks at most 'timeout' seconds and raises 122 a non-negative number, it blocks at most 'timeout' seconds and raises
121 the Full exception if no free slot was available within that time. 123 the Full exception if no free slot was available within that time.
122 Otherwise ('block' is false), put an item on the queue if a free slot 124 Otherwise ('block' is false), put an item on the queue if a free slot
123 is immediately available, else raise the Full exception ('timeout' 125 is immediately available, else raise the Full exception ('timeout'
124 is ignored in that case). 126 is ignored in that case).
125 ''' 127 '''
126 with self.not_full: 128 if self.maxsize <= 0:
127 if self.maxsize > 0: 129 self._put_unbounded(item)
130 else:
131 with self.not_full:
128 if not block: 132 if not block:
129 if self._qsize() >= self.maxsize: 133 if self._qsize() >= self.maxsize:
130 raise Full 134 raise Full
131 elif timeout is None: 135 elif timeout is None:
132 while self._qsize() >= self.maxsize: 136 while self._qsize() >= self.maxsize:
133 self.not_full.wait() 137 self.not_full.wait()
134 elif timeout < 0: 138 elif timeout < 0:
135 raise ValueError("'timeout' must be a non-negative number") 139 raise ValueError("'timeout' must be a non-negative number")
136 else: 140 else:
137 endtime = time() + timeout 141 endtime = time() + timeout
138 while self._qsize() >= self.maxsize: 142 while self._qsize() >= self.maxsize:
139 remaining = endtime - time() 143 remaining = endtime - time()
140 if remaining <= 0.0: 144 if remaining <= 0.0:
141 raise Full 145 raise Full
142 self.not_full.wait(remaining) 146 self.not_full.wait(remaining)
147
148 self._put(item)
149 self.not_empty.notify()
150
151 def _put_unbounded(self, item):
152 reentrant_call = self.mutex._is_owned()
153 if reentrant_call:
154 if not self._has_atomic_put:
155 raise RuntimeError("reentrant put() call on %s"
156 % (self.__class__.__name__,))
143 self._put(item) 157 self._put(item)
144 self.unfinished_tasks += 1 158 else:
145 self.not_empty.notify() 159 with self.mutex:
160 self._put(item)
161 self.not_empty.notify()
146 162
147 def get(self, block=True, timeout=None): 163 def get(self, block=True, timeout=None):
148 '''Remove and return an item from the queue. 164 '''Remove and return an item from the queue.
149 165
150 If optional args 'block' is true and 'timeout' is None (the default), 166 If optional args 'block' is true and 'timeout' is None (the default),
151 block if necessary until an item is available. If 'timeout' is 167 block if necessary until an item is available. If 'timeout' is
152 a non-negative number, it blocks at most 'timeout' seconds and raises 168 a non-negative number, it blocks at most 'timeout' seconds and raises
153 the Empty exception if no item was available within that time. 169 the Empty exception if no item was available within that time.
154 Otherwise ('block' is false), return an item if one is immediately 170 Otherwise ('block' is false), return an item if one is immediately
155 available, else raise the Empty exception ('timeout' is ignored 171 available, else raise the Empty exception ('timeout' is ignored
156 in that case). 172 in that case).
157 ''' 173 '''
158 with self.not_empty: 174 with self.not_empty:
159 if not block: 175 if not block:
160 if not self._qsize(): 176 if not self._qsize():
161 raise Empty 177 raise Empty
162 elif timeout is None: 178 elif timeout is None:
163 while not self._qsize(): 179 while not self._qsize():
164 self.not_empty.wait() 180 self.not_empty.wait()
165 elif timeout < 0: 181 elif timeout < 0:
166 raise ValueError("'timeout' must be a non-negative number") 182 raise ValueError("'timeout' must be a non-negative number")
167 else: 183 else:
168 endtime = time() + timeout 184 endtime = time() + timeout
169 while not self._qsize(): 185 while not self._qsize():
170 remaining = endtime - time() 186 remaining = endtime - time()
171 if remaining <= 0.0: 187 if remaining <= 0.0:
172 raise Empty 188 raise Empty
173 self.not_empty.wait(remaining) 189 self.not_empty.wait(remaining)
174 item = self._get() 190 item = self._get()
191 self.unfinished_tasks += 1
175 self.not_full.notify() 192 self.not_full.notify()
176 return item 193 return item
177 194
178 def put_nowait(self, item): 195 def put_nowait(self, item):
179 '''Put an item into the queue without blocking. 196 '''Put an item into the queue without blocking.
180 197
181 Only enqueue the item if a free slot is immediately available. 198 Only enqueue the item if a free slot is immediately available.
182 Otherwise raise the Full exception. 199 Otherwise raise the Full exception.
183 ''' 200 '''
184 return self.put(item, block=False) 201 return self.put(item, block=False)
(...skipping 24 matching lines...) Expand all
209 # Get an item from the queue 226 # Get an item from the queue
210 def _get(self): 227 def _get(self):
211 return self.queue.popleft() 228 return self.queue.popleft()
212 229
213 230
214 class PriorityQueue(Queue): 231 class PriorityQueue(Queue):
215 '''Variant of Queue that retrieves open entries in priority order (lowest fi rst). 232 '''Variant of Queue that retrieves open entries in priority order (lowest fi rst).
216 233
217 Entries are typically tuples of the form: (priority number, data). 234 Entries are typically tuples of the form: (priority number, data).
218 ''' 235 '''
236 _has_atomic_put = False
219 237
220 def _init(self, maxsize): 238 def _init(self, maxsize):
221 self.queue = [] 239 self.queue = []
222 240
223 def _qsize(self): 241 def _qsize(self):
224 return len(self.queue) 242 return len(self.queue)
225 243
226 def _put(self, item): 244 def _put(self, item):
227 heappush(self.queue, item) 245 heappush(self.queue, item)
228 246
229 def _get(self): 247 def _get(self):
230 return heappop(self.queue) 248 return heappop(self.queue)
231 249
232 250
233 class LifoQueue(Queue): 251 class LifoQueue(Queue):
234 '''Variant of Queue that retrieves most recently added entries first.''' 252 '''Variant of Queue that retrieves most recently added entries first.'''
235 253
236 def _init(self, maxsize): 254 def _init(self, maxsize):
237 self.queue = [] 255 self.queue = []
238 256
239 def _qsize(self): 257 def _qsize(self):
240 return len(self.queue) 258 return len(self.queue)
241 259
242 def _put(self, item): 260 def _put(self, item):
243 self.queue.append(item) 261 self.queue.append(item)
244 262
245 def _get(self): 263 def _get(self):
246 return self.queue.pop() 264 return self.queue.pop()
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+