| OLD | NEW |
| 1 import test.support | 1 import test.support |
| 2 | 2 |
| 3 # Skip tests if _multiprocessing wasn't built. | 3 # Skip tests if _multiprocessing wasn't built. |
| 4 test.support.import_module('_multiprocessing') | 4 test.support.import_module('_multiprocessing') |
| 5 # Skip tests if sem_open implementation is broken. | 5 # Skip tests if sem_open implementation is broken. |
| 6 test.support.import_module('multiprocessing.synchronize') | 6 test.support.import_module('multiprocessing.synchronize') |
| 7 # import threading after _multiprocessing to raise a more revelant error | 7 # import threading after _multiprocessing to raise a more revelant error |
| 8 # message: "No module named _multiprocessing". _multiprocessing is not compiled | 8 # message: "No module named _multiprocessing". _multiprocessing is not compiled |
| 9 # without thread support. | 9 # without thread support. |
| 10 test.support.import_module('threading') | 10 test.support.import_module('threading') |
| 11 | 11 |
| 12 from test.script_helper import assert_python_ok | 12 from test.script_helper import assert_python_ok |
| 13 | 13 |
| 14 import sys | 14 import sys |
| 15 import threading | 15 import threading |
| 16 import time | 16 import time |
| 17 import unittest | 17 import unittest |
| 18 | 18 |
| 19 from concurrent import futures | 19 from concurrent import futures |
| 20 from concurrent.futures._base import ( | 20 from concurrent.futures._base import ( |
| 21 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) | 21 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) |
| 22 import concurrent.futures.process | 22 from concurrent.futures.process import BrokenProcessPool |
| 23 | 23 |
| 24 | 24 |
| 25 def create_future(state=PENDING, exception=None, result=None): | 25 def create_future(state=PENDING, exception=None, result=None): |
| 26 f = Future() | 26 f = Future() |
| 27 f._state = state | 27 f._state = state |
| 28 f._exception = exception | 28 f._exception = exception |
| 29 f._result = result | 29 f._result = result |
| 30 return f | 30 return f |
| 31 | 31 |
| 32 | 32 |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 147 pass | 147 pass |
| 148 | 148 |
| 149 def test_processes_terminate(self): | 149 def test_processes_terminate(self): |
| 150 self.executor.submit(mul, 21, 2) | 150 self.executor.submit(mul, 21, 2) |
| 151 self.executor.submit(mul, 6, 7) | 151 self.executor.submit(mul, 6, 7) |
| 152 self.executor.submit(mul, 3, 14) | 152 self.executor.submit(mul, 3, 14) |
| 153 self.assertEqual(len(self.executor._processes), 5) | 153 self.assertEqual(len(self.executor._processes), 5) |
| 154 processes = self.executor._processes | 154 processes = self.executor._processes |
| 155 self.executor.shutdown() | 155 self.executor.shutdown() |
| 156 | 156 |
| 157 for p in processes: | 157 for p in processes.values(): |
| 158 p.join() | 158 p.join() |
| 159 | 159 |
| 160 def test_context_manager_shutdown(self): | 160 def test_context_manager_shutdown(self): |
| 161 with futures.ProcessPoolExecutor(max_workers=5) as e: | 161 with futures.ProcessPoolExecutor(max_workers=5) as e: |
| 162 processes = e._processes | 162 processes = e._processes |
| 163 self.assertEqual(list(e.map(abs, range(-5, 5))), | 163 self.assertEqual(list(e.map(abs, range(-5, 5))), |
| 164 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) | 164 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) |
| 165 | 165 |
| 166 for p in processes: | 166 for p in processes.values(): |
| 167 p.join() | 167 p.join() |
| 168 | 168 |
| 169 def test_del_shutdown(self): | 169 def test_del_shutdown(self): |
| 170 executor = futures.ProcessPoolExecutor(max_workers=5) | 170 executor = futures.ProcessPoolExecutor(max_workers=5) |
| 171 list(executor.map(abs, range(-5, 5))) | 171 list(executor.map(abs, range(-5, 5))) |
| 172 queue_management_thread = executor._queue_management_thread | 172 queue_management_thread = executor._queue_management_thread |
| 173 processes = executor._processes | 173 processes = executor._processes |
| 174 del executor | 174 del executor |
| 175 | 175 |
| 176 queue_management_thread.join() | 176 queue_management_thread.join() |
| 177 for p in processes: | 177 for p in processes.values(): |
| 178 p.join() | 178 p.join() |
| 179 | 179 |
| 180 class WaitTests(unittest.TestCase): | 180 class WaitTests(unittest.TestCase): |
| 181 def test_first_completed(self): | 181 def test_first_completed(self): |
| 182 future1 = self.executor.submit(mul, 21, 2) | 182 future1 = self.executor.submit(mul, 21, 2) |
| 183 future2 = self.executor.submit(time.sleep, 1.5) | 183 future2 = self.executor.submit(time.sleep, 1.5) |
| 184 | 184 |
| 185 done, not_done = futures.wait( | 185 done, not_done = futures.wait( |
| 186 [CANCELLED_FUTURE, future1, future2], | 186 [CANCELLED_FUTURE, future1, future2], |
| 187 return_when=futures.FIRST_COMPLETED) | 187 return_when=futures.FIRST_COMPLETED) |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 374 finished = [] | 374 finished = [] |
| 375 def record_finished(n): | 375 def record_finished(n): |
| 376 finished.append(n) | 376 finished.append(n) |
| 377 | 377 |
| 378 self.executor.map(record_finished, range(10)) | 378 self.executor.map(record_finished, range(10)) |
| 379 self.executor.shutdown(wait=True) | 379 self.executor.shutdown(wait=True) |
| 380 self.assertCountEqual(finished, range(10)) | 380 self.assertCountEqual(finished, range(10)) |
| 381 | 381 |
| 382 | 382 |
| 383 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): | 383 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): |
| 384 pass | 384 def test_killed_child(self): |
| 385 # When a child process is abruptly terminated, the whole pool gets |
| 386 # "broken". |
| 387 futures = [self.executor.submit(time.sleep, 3)] |
| 388 # Get one of the processes, and terminate (kill) it |
| 389 p = next(iter(self.executor._processes.values())) |
| 390 p.terminate() |
| 391 for fut in futures: |
| 392 self.assertRaises(BrokenProcessPool, fut.result) |
| 393 # Submitting other jobs fails as well. |
| 394 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) |
| 385 | 395 |
| 386 | 396 |
| 387 class FutureTests(unittest.TestCase): | 397 class FutureTests(unittest.TestCase): |
| 388 def test_done_callback_with_result(self): | 398 def test_done_callback_with_result(self): |
| 389 callback_result = None | 399 callback_result = None |
| 390 def fn(callback_future): | 400 def fn(callback_future): |
| 391 nonlocal callback_result | 401 nonlocal callback_result |
| 392 callback_result = callback_future.result() | 402 callback_result = callback_future.result() |
| 393 | 403 |
| 394 f = Future() | 404 f = Future() |
| (...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 612 ProcessPoolWaitTests, | 622 ProcessPoolWaitTests, |
| 613 ThreadPoolWaitTests, | 623 ThreadPoolWaitTests, |
| 614 ProcessPoolAsCompletedTests, | 624 ProcessPoolAsCompletedTests, |
| 615 ThreadPoolAsCompletedTests, | 625 ThreadPoolAsCompletedTests, |
| 616 FutureTests, | 626 FutureTests, |
| 617 ProcessPoolShutdownTest, | 627 ProcessPoolShutdownTest, |
| 618 ThreadPoolShutdownTest) | 628 ThreadPoolShutdownTest) |
| 619 | 629 |
| 620 if __name__ == "__main__": | 630 if __name__ == "__main__": |
| 621 test_main() | 631 test_main() |
| OLD | NEW |