Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(807)

Side by Side Diff: Lib/test/test_concurrent_futures.py

Issue 9205: Parent process hanging in multiprocessing if children terminate unexpectedly
Patch Set: Created 1 year, 11 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 import test.support 1 import test.support
2 2
3 # Skip tests if _multiprocessing wasn't built. 3 # Skip tests if _multiprocessing wasn't built.
4 test.support.import_module('_multiprocessing') 4 test.support.import_module('_multiprocessing')
5 # Skip tests if sem_open implementation is broken. 5 # Skip tests if sem_open implementation is broken.
6 test.support.import_module('multiprocessing.synchronize') 6 test.support.import_module('multiprocessing.synchronize')
7 # import threading after _multiprocessing to raise a more revelant error 7 # import threading after _multiprocessing to raise a more revelant error
8 # message: "No module named _multiprocessing". _multiprocessing is not compiled 8 # message: "No module named _multiprocessing". _multiprocessing is not compiled
9 # without thread support. 9 # without thread support.
10 test.support.import_module('threading') 10 test.support.import_module('threading')
11 11
12 from test.script_helper import assert_python_ok 12 from test.script_helper import assert_python_ok
13 13
14 import sys 14 import sys
15 import threading 15 import threading
16 import time 16 import time
17 import unittest 17 import unittest
18 18
19 from concurrent import futures 19 from concurrent import futures
20 from concurrent.futures._base import ( 20 from concurrent.futures._base import (
21 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) 21 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
22 import concurrent.futures.process 22 from concurrent.futures.process import BrokenProcessPool
23 23
24 24
25 def create_future(state=PENDING, exception=None, result=None): 25 def create_future(state=PENDING, exception=None, result=None):
26 f = Future() 26 f = Future()
27 f._state = state 27 f._state = state
28 f._exception = exception 28 f._exception = exception
29 f._result = result 29 f._result = result
30 return f 30 return f
31 31
32 32
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 pass 147 pass
148 148
149 def test_processes_terminate(self): 149 def test_processes_terminate(self):
150 self.executor.submit(mul, 21, 2) 150 self.executor.submit(mul, 21, 2)
151 self.executor.submit(mul, 6, 7) 151 self.executor.submit(mul, 6, 7)
152 self.executor.submit(mul, 3, 14) 152 self.executor.submit(mul, 3, 14)
153 self.assertEqual(len(self.executor._processes), 5) 153 self.assertEqual(len(self.executor._processes), 5)
154 processes = self.executor._processes 154 processes = self.executor._processes
155 self.executor.shutdown() 155 self.executor.shutdown()
156 156
157 for p in processes: 157 for p in processes.values():
158 p.join() 158 p.join()
159 159
160 def test_context_manager_shutdown(self): 160 def test_context_manager_shutdown(self):
161 with futures.ProcessPoolExecutor(max_workers=5) as e: 161 with futures.ProcessPoolExecutor(max_workers=5) as e:
162 processes = e._processes 162 processes = e._processes
163 self.assertEqual(list(e.map(abs, range(-5, 5))), 163 self.assertEqual(list(e.map(abs, range(-5, 5))),
164 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 164 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
165 165
166 for p in processes: 166 for p in processes.values():
167 p.join() 167 p.join()
168 168
169 def test_del_shutdown(self): 169 def test_del_shutdown(self):
170 executor = futures.ProcessPoolExecutor(max_workers=5) 170 executor = futures.ProcessPoolExecutor(max_workers=5)
171 list(executor.map(abs, range(-5, 5))) 171 list(executor.map(abs, range(-5, 5)))
172 queue_management_thread = executor._queue_management_thread 172 queue_management_thread = executor._queue_management_thread
173 processes = executor._processes 173 processes = executor._processes
174 del executor 174 del executor
175 175
176 queue_management_thread.join() 176 queue_management_thread.join()
177 for p in processes: 177 for p in processes.values():
178 p.join() 178 p.join()
179 179
180 class WaitTests(unittest.TestCase): 180 class WaitTests(unittest.TestCase):
181 def test_first_completed(self): 181 def test_first_completed(self):
182 future1 = self.executor.submit(mul, 21, 2) 182 future1 = self.executor.submit(mul, 21, 2)
183 future2 = self.executor.submit(time.sleep, 1.5) 183 future2 = self.executor.submit(time.sleep, 1.5)
184 184
185 done, not_done = futures.wait( 185 done, not_done = futures.wait(
186 [CANCELLED_FUTURE, future1, future2], 186 [CANCELLED_FUTURE, future1, future2],
187 return_when=futures.FIRST_COMPLETED) 187 return_when=futures.FIRST_COMPLETED)
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
374 finished = [] 374 finished = []
375 def record_finished(n): 375 def record_finished(n):
376 finished.append(n) 376 finished.append(n)
377 377
378 self.executor.map(record_finished, range(10)) 378 self.executor.map(record_finished, range(10))
379 self.executor.shutdown(wait=True) 379 self.executor.shutdown(wait=True)
380 self.assertCountEqual(finished, range(10)) 380 self.assertCountEqual(finished, range(10))
381 381
382 382
383 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): 383 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
384 pass 384 def test_killed_child(self):
385 # When a child process is abruptly terminated, the whole pool gets
386 # "broken".
387 futures = [self.executor.submit(time.sleep, 3)]
388 # Get one of the processes, and terminate (kill) it
389 p = next(iter(self.executor._processes.values()))
390 p.terminate()
391 for fut in futures:
bquinlan 2011/06/08 12:31:32 fut => f (for consistency)
392 self.assertRaises(BrokenProcessPool, fut.result)
393 # Submitting other jobs fails as well.
394 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
385 395
386 396
387 class FutureTests(unittest.TestCase): 397 class FutureTests(unittest.TestCase):
388 def test_done_callback_with_result(self): 398 def test_done_callback_with_result(self):
389 callback_result = None 399 callback_result = None
390 def fn(callback_future): 400 def fn(callback_future):
391 nonlocal callback_result 401 nonlocal callback_result
392 callback_result = callback_future.result() 402 callback_result = callback_future.result()
393 403
394 f = Future() 404 f = Future()
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
612 ProcessPoolWaitTests, 622 ProcessPoolWaitTests,
613 ThreadPoolWaitTests, 623 ThreadPoolWaitTests,
614 ProcessPoolAsCompletedTests, 624 ProcessPoolAsCompletedTests,
615 ThreadPoolAsCompletedTests, 625 ThreadPoolAsCompletedTests,
616 FutureTests, 626 FutureTests,
617 ProcessPoolShutdownTest, 627 ProcessPoolShutdownTest,
618 ThreadPoolShutdownTest) 628 ThreadPoolShutdownTest)
619 629
620 if __name__ == "__main__": 630 if __name__ == "__main__":
621 test_main() 631 test_main()
OLDNEW

RSS Feeds Recent Issues | This issue
This is Rietveld cbc36f91f3f7