Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(7)

Unified Diff: Lib/concurrent/futures/process.py

Issue 10639: reindent.py converts newlines to platform default
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Lib/cgitb.py ('k') | Lib/contextlib.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
--- a/Lib/concurrent/futures/process.py Tue Jul 26 09:37:46 2011 +0300
+++ b/Lib/concurrent/futures/process.py Mon Jul 25 09:47:18 2011 -0400
@@ -50,7 +50,7 @@
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady, Full
+from multiprocessing.queues import SimpleQueue, SentinelReady
import threading
import weakref
@@ -195,20 +195,15 @@
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
- executor = None
-
- def shutting_down():
- return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
- call_queue.put_nowait(None)
- # Release the queue's resources as soon as possible.
- call_queue.close()
+ call_queue.put(None)
# If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS X.
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
for p in processes.values():
p.join()
@@ -227,7 +222,7 @@
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
- executor = None
+ del executor
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
@@ -241,17 +236,13 @@
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
- shutdown_worker()
+ for p in processes.values():
+ p.join()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
- assert shutting_down()
- p = processes.pop(result_item)
- p.join()
- if not processes:
- shutdown_worker()
- return
+ del processes[result_item]
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
@@ -266,21 +257,16 @@
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
- if shutting_down():
- try:
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not pending_work_items:
- shutdown_worker()
- return
- else:
- # Start shutting down by telling a process it can exit.
- call_queue.put_nowait(None)
- except Full:
- # This is not a problem: we will eventually be woken up (in
- # result_queue.get()) and be able to send a sentinel again.
- pass
- executor = None
+ if _shutdown or executor is None or executor._shutdown_thread:
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ shutdown_worker()
+ return
+ else:
+ # Start shutting down by telling a process it can exit.
+ call_queue.put(None)
+ del executor
_system_limits_checked = False
_system_limited = None
@@ -336,10 +322,6 @@
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
- # Killed worker processes can produce spurious "broken pipe"
- # tracebacks in the queue's own worker thread. But we detect killed
- # processes anyway, so silence the tracebacks.
- self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
« no previous file with comments | « Lib/cgitb.py ('k') | Lib/contextlib.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+