Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(17379)

Side by Side Diff: Lib/concurrent/futures/thread.py

Issue 27664: Allow specifying prefix for thread name in concurrent.futures.ThreadPoolExecutor
Patch Set: Created 3 years, 1 month ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | Lib/test/test_concurrent_futures.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement. 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 """Implements ThreadPoolExecutor.""" 4 """Implements ThreadPoolExecutor."""
5 5
6 __author__ = 'Brian Quinlan (brian@sweetapp.com)' 6 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
7 7
8 import atexit 8 import atexit
9 from concurrent.futures import _base 9 from concurrent.futures import _base
10 import queue 10 import queue
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 # - The executor that owns the worker has been shutdown. 74 # - The executor that owns the worker has been shutdown.
75 if _shutdown or executor is None or executor._shutdown: 75 if _shutdown or executor is None or executor._shutdown:
76 # Notice other workers 76 # Notice other workers
77 work_queue.put(None) 77 work_queue.put(None)
78 return 78 return
79 del executor 79 del executor
80 except BaseException: 80 except BaseException:
81 _base.LOGGER.critical('Exception in worker', exc_info=True) 81 _base.LOGGER.critical('Exception in worker', exc_info=True)
82 82
83 class ThreadPoolExecutor(_base.Executor): 83 class ThreadPoolExecutor(_base.Executor):
84 def __init__(self, max_workers=None): 84 def __init__(self, max_workers=None, thread_name_prefix=None):
85 """Initializes a new ThreadPoolExecutor instance. 85 """Initializes a new ThreadPoolExecutor instance.
86 86
87 Args: 87 Args:
88 max_workers: The maximum number of threads that can be used to 88 max_workers: The maximum number of threads that can be used to
89 execute the given calls. 89 execute the given calls.
90 """ 90 """
91 if max_workers is None: 91 if max_workers is None:
92 # Use this number because ThreadPoolExecutor is often 92 # Use this number because ThreadPoolExecutor is often
93 # used to overlap I/O instead of CPU work. 93 # used to overlap I/O instead of CPU work.
94 max_workers = (os.cpu_count() or 1) * 5 94 max_workers = (os.cpu_count() or 1) * 5
95 if max_workers <= 0: 95 if max_workers <= 0:
96 raise ValueError("max_workers must be greater than 0") 96 raise ValueError("max_workers must be greater than 0")
97 97
98 self._max_workers = max_workers 98 self._max_workers = max_workers
99 self._work_queue = queue.Queue() 99 self._work_queue = queue.Queue()
100 self._threads = set() 100 self._threads = set()
101 self._shutdown = False 101 self._shutdown = False
102 self._shutdown_lock = threading.Lock() 102 self._shutdown_lock = threading.Lock()
103 self._thread_name_prefix = thread_name_prefix
103 104
104 def submit(self, fn, *args, **kwargs): 105 def submit(self, fn, *args, **kwargs):
105 with self._shutdown_lock: 106 with self._shutdown_lock:
106 if self._shutdown: 107 if self._shutdown:
107 raise RuntimeError('cannot schedule new futures after shutdown') 108 raise RuntimeError('cannot schedule new futures after shutdown')
108 109
109 f = _base.Future() 110 f = _base.Future()
110 w = _WorkItem(f, fn, args, kwargs) 111 w = _WorkItem(f, fn, args, kwargs)
111 112
112 self._work_queue.put(w) 113 self._work_queue.put(w)
113 self._adjust_thread_count() 114 self._adjust_thread_count()
114 return f 115 return f
115 submit.__doc__ = _base.Executor.submit.__doc__ 116 submit.__doc__ = _base.Executor.submit.__doc__
116 117
117 def _adjust_thread_count(self): 118 def _adjust_thread_count(self):
118 # When the executor gets lost, the weakref callback will wake up 119 # When the executor gets lost, the weakref callback will wake up
119 # the worker threads. 120 # the worker threads.
120 def weakref_cb(_, q=self._work_queue): 121 def weakref_cb(_, q=self._work_queue):
121 q.put(None) 122 q.put(None)
122 # TODO(bquinlan): Should avoid creating new threads if there are more 123 # TODO(bquinlan): Should avoid creating new threads if there are more
123 # idle threads than items in the work queue. 124 # idle threads than items in the work queue.
124 if len(self._threads) < self._max_workers: 125 if len(self._threads) < self._max_workers:
125 t = threading.Thread(target=_worker, 126 if self._thread_name_prefix:
127 name = self._thread_name_prefix + '_%d' % len(self._threads)
128 else:
129 name = None
130 t = threading.Thread(name=name, target=_worker,
126 args=(weakref.ref(self, weakref_cb), 131 args=(weakref.ref(self, weakref_cb),
127 self._work_queue)) 132 self._work_queue))
128 t.daemon = True 133 t.daemon = True
129 t.start() 134 t.start()
130 self._threads.add(t) 135 self._threads.add(t)
131 _threads_queues[t] = self._work_queue 136 _threads_queues[t] = self._work_queue
132 137
133 def shutdown(self, wait=True): 138 def shutdown(self, wait=True):
134 with self._shutdown_lock: 139 with self._shutdown_lock:
135 self._shutdown = True 140 self._shutdown = True
136 self._work_queue.put(None) 141 self._work_queue.put(None)
137 if wait: 142 if wait:
138 for t in self._threads: 143 for t in self._threads:
139 t.join() 144 t.join()
140 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 145 shutdown.__doc__ = _base.Executor.shutdown.__doc__
OLDNEW
« no previous file with comments | « no previous file | Lib/test/test_concurrent_futures.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+