Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(15)

Side by Side Diff: Lib/concurrent/futures/process.py

Issue 10639: reindent.py converts newlines to platform default
Patch Set: Created 8 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/cgitb.py ('k') | Lib/contextlib.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement. 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 """Implements ProcessPoolExecutor. 4 """Implements ProcessPoolExecutor.
5 5
6 The follow diagram and text describe the data-flow through the system: 6 The follow diagram and text describe the data-flow through the system:
7 7
8 |======================= In-process =====================|== Out-of-process ==| 8 |======================= In-process =====================|== Out-of-process ==|
9 9
10 +----------+ +----------+ +--------+ +-----------+ +---------+ 10 +----------+ +----------+ +--------+ +-----------+ +---------+
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
43 _ResultItems in "Request Q" 43 _ResultItems in "Request Q"
44 """ 44 """
45 45
46 __author__ = 'Brian Quinlan (brian@sweetapp.com)' 46 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
47 47
48 import atexit 48 import atexit
49 import os 49 import os
50 from concurrent.futures import _base 50 from concurrent.futures import _base
51 import queue 51 import queue
52 import multiprocessing 52 import multiprocessing
53 from multiprocessing.queues import SimpleQueue, SentinelReady, Full 53 from multiprocessing.queues import SimpleQueue, SentinelReady
54 import threading 54 import threading
55 import weakref 55 import weakref
56 56
57 # Workers are created as daemon threads and processes. This is done to allow the 57 # Workers are created as daemon threads and processes. This is done to allow the
58 # interpreter to exit when there are still idle processes in a 58 # interpreter to exit when there are still idle processes in a
59 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, 59 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
60 # allowing workers to die with the interpreter has two undesirable properties: 60 # allowing workers to die with the interpreter has two undesirable properties:
61 # - The workers would still be running during interpretor shutdown, 61 # - The workers would still be running during interpretor shutdown,
62 # meaning that they would fail in unpredictable ways. 62 # meaning that they would fail in unpredictable ways.
63 # - The workers could be killed while evaluating a work item, which could 63 # - The workers could be killed while evaluating a work item, which could
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
188 process: A list of the multiprocessing.Process instances used as 188 process: A list of the multiprocessing.Process instances used as
189 workers. 189 workers.
190 pending_work_items: A dict mapping work ids to _WorkItems e.g. 190 pending_work_items: A dict mapping work ids to _WorkItems e.g.
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). 192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems 193 call_queue: A multiprocessing.Queue that will be filled with _CallItems
194 derived from _WorkItems for processing by the process workers. 194 derived from _WorkItems for processing by the process workers.
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the 195 result_queue: A multiprocessing.Queue of _ResultItems generated by the
196 process workers. 196 process workers.
197 """ 197 """
198 executor = None
199
200 def shutting_down():
201 return _shutdown or executor is None or executor._shutdown_thread
202 198
203 def shutdown_worker(): 199 def shutdown_worker():
204 # This is an upper bound 200 # This is an upper bound
205 nb_children_alive = sum(p.is_alive() for p in processes.values()) 201 nb_children_alive = sum(p.is_alive() for p in processes.values())
206 for i in range(0, nb_children_alive): 202 for i in range(0, nb_children_alive):
207 call_queue.put_nowait(None) 203 call_queue.put(None)
208 # Release the queue's resources as soon as possible.
209 call_queue.close()
210 # If .join() is not called on the created processes then 204 # If .join() is not called on the created processes then
211 # some multiprocessing.Queue methods may deadlock on Mac OS X. 205 # some multiprocessing.Queue methods may deadlock on Mac OS
206 # X.
212 for p in processes.values(): 207 for p in processes.values():
213 p.join() 208 p.join()
214 209
215 while True: 210 while True:
216 _add_call_item_to_queue(pending_work_items, 211 _add_call_item_to_queue(pending_work_items,
217 work_ids_queue, 212 work_ids_queue,
218 call_queue) 213 call_queue)
219 214
220 sentinels = [p.sentinel for p in processes.values()] 215 sentinels = [p.sentinel for p in processes.values()]
221 assert sentinels 216 assert sentinels
222 try: 217 try:
223 result_item = result_queue.get(sentinels=sentinels) 218 result_item = result_queue.get(sentinels=sentinels)
224 except SentinelReady as e: 219 except SentinelReady as e:
225 # Mark the process pool broken so that submits fail right now. 220 # Mark the process pool broken so that submits fail right now.
226 executor = executor_reference() 221 executor = executor_reference()
227 if executor is not None: 222 if executor is not None:
228 executor._broken = True 223 executor._broken = True
229 executor._shutdown_thread = True 224 executor._shutdown_thread = True
230 executor = None 225 del executor
231 # All futures in flight must be marked failed 226 # All futures in flight must be marked failed
232 for work_id, work_item in pending_work_items.items(): 227 for work_id, work_item in pending_work_items.items():
233 work_item.future.set_exception( 228 work_item.future.set_exception(
234 BrokenProcessPool( 229 BrokenProcessPool(
235 "A process in the process pool was " 230 "A process in the process pool was "
236 "terminated abruptly while the future was " 231 "terminated abruptly while the future was "
237 "running or pending." 232 "running or pending."
238 )) 233 ))
239 pending_work_items.clear() 234 pending_work_items.clear()
240 # Terminate remaining workers forcibly: the queues or their 235 # Terminate remaining workers forcibly: the queues or their
241 # locks may be in a dirty state and block forever. 236 # locks may be in a dirty state and block forever.
242 for p in processes.values(): 237 for p in processes.values():
243 p.terminate() 238 p.terminate()
244 shutdown_worker() 239 for p in processes.values():
240 p.join()
245 return 241 return
246 if isinstance(result_item, int): 242 if isinstance(result_item, int):
247 # Clean shutdown of a worker using its PID 243 # Clean shutdown of a worker using its PID
248 # (avoids marking the executor broken) 244 # (avoids marking the executor broken)
249 assert shutting_down() 245 del processes[result_item]
250 p = processes.pop(result_item)
251 p.join()
252 if not processes:
253 shutdown_worker()
254 return
255 elif result_item is not None: 246 elif result_item is not None:
256 work_item = pending_work_items.pop(result_item.work_id, None) 247 work_item = pending_work_items.pop(result_item.work_id, None)
257 # work_item can be None if another process terminated (see above) 248 # work_item can be None if another process terminated (see above)
258 if work_item is not None: 249 if work_item is not None:
259 if result_item.exception: 250 if result_item.exception:
260 work_item.future.set_exception(result_item.exception) 251 work_item.future.set_exception(result_item.exception)
261 else: 252 else:
262 work_item.future.set_result(result_item.result) 253 work_item.future.set_result(result_item.result)
263 # Check whether we should start shutting down. 254 # Check whether we should start shutting down.
264 executor = executor_reference() 255 executor = executor_reference()
265 # No more work items can be added if: 256 # No more work items can be added if:
266 # - The interpreter is shutting down OR 257 # - The interpreter is shutting down OR
267 # - The executor that owns this worker has been collected OR 258 # - The executor that owns this worker has been collected OR
268 # - The executor that owns this worker has been shutdown. 259 # - The executor that owns this worker has been shutdown.
269 if shutting_down(): 260 if _shutdown or executor is None or executor._shutdown_thread:
270 try: 261 # Since no new work items can be added, it is safe to shutdown
271 # Since no new work items can be added, it is safe to shutdown 262 # this thread if there are no pending work items.
272 # this thread if there are no pending work items. 263 if not pending_work_items:
273 if not pending_work_items: 264 shutdown_worker()
274 shutdown_worker() 265 return
275 return 266 else:
276 else: 267 # Start shutting down by telling a process it can exit.
277 # Start shutting down by telling a process it can exit. 268 call_queue.put(None)
278 call_queue.put_nowait(None) 269 del executor
279 except Full:
280 # This is not a problem: we will eventually be woken up (in
281 # result_queue.get()) and be able to send a sentinel again.
282 pass
283 executor = None
284 270
285 _system_limits_checked = False 271 _system_limits_checked = False
286 _system_limited = None 272 _system_limited = None
287 def _check_system_limits(): 273 def _check_system_limits():
288 global _system_limits_checked, _system_limited 274 global _system_limits_checked, _system_limited
289 if _system_limits_checked: 275 if _system_limits_checked:
290 if _system_limited: 276 if _system_limited:
291 raise NotImplementedError(_system_limited) 277 raise NotImplementedError(_system_limited)
292 _system_limits_checked = True 278 _system_limits_checked = True
293 try: 279 try:
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
329 if max_workers is None: 315 if max_workers is None:
330 self._max_workers = multiprocessing.cpu_count() 316 self._max_workers = multiprocessing.cpu_count()
331 else: 317 else:
332 self._max_workers = max_workers 318 self._max_workers = max_workers
333 319
334 # Make the call queue slightly larger than the number of processes to 320 # Make the call queue slightly larger than the number of processes to
335 # prevent the worker processes from idling. But don't make it too big 321 # prevent the worker processes from idling. But don't make it too big
336 # because futures in the call queue cannot be cancelled. 322 # because futures in the call queue cannot be cancelled.
337 self._call_queue = multiprocessing.Queue(self._max_workers + 323 self._call_queue = multiprocessing.Queue(self._max_workers +
338 EXTRA_QUEUED_CALLS) 324 EXTRA_QUEUED_CALLS)
339 # Killed worker processes can produce spurious "broken pipe"
340 # tracebacks in the queue's own worker thread. But we detect killed
341 # processes anyway, so silence the tracebacks.
342 self._call_queue._ignore_epipe = True
343 self._result_queue = SimpleQueue() 325 self._result_queue = SimpleQueue()
344 self._work_ids = queue.Queue() 326 self._work_ids = queue.Queue()
345 self._queue_management_thread = None 327 self._queue_management_thread = None
346 # Map of pids to processes 328 # Map of pids to processes
347 self._processes = {} 329 self._processes = {}
348 330
349 # Shutdown is a two-step process. 331 # Shutdown is a two-step process.
350 self._shutdown_thread = False 332 self._shutdown_thread = False
351 self._shutdown_lock = threading.Lock() 333 self._shutdown_lock = threading.Lock()
352 self._broken = False 334 self._broken = False
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 self._queue_management_thread.join() 395 self._queue_management_thread.join()
414 # To reduce the risk of openning too many files, remove references to 396 # To reduce the risk of openning too many files, remove references to
415 # objects that use file descriptors. 397 # objects that use file descriptors.
416 self._queue_management_thread = None 398 self._queue_management_thread = None
417 self._call_queue = None 399 self._call_queue = None
418 self._result_queue = None 400 self._result_queue = None
419 self._processes = None 401 self._processes = None
420 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 402 shutdown.__doc__ = _base.Executor.shutdown.__doc__
421 403
422 atexit.register(_python_exit) 404 atexit.register(_python_exit)
OLDNEW
« no previous file with comments | « Lib/cgitb.py ('k') | Lib/contextlib.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+