Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(165454)

Delta Between Two Patch Sets: Lib/test/test_concurrent_futures.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 6 years, 3 months ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_complex.py ('k') | Lib/test/test_configparser.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 import test.support 1 import test.support
2 2
3 # Skip tests if _multiprocessing wasn't built. 3 # Skip tests if _multiprocessing wasn't built.
4 test.support.import_module('_multiprocessing') 4 test.support.import_module('_multiprocessing')
5 # Skip tests if sem_open implementation is broken. 5 # Skip tests if sem_open implementation is broken.
6 test.support.import_module('multiprocessing.synchronize') 6 test.support.import_module('multiprocessing.synchronize')
7 # import threading after _multiprocessing to raise a more revelant error 7 # import threading after _multiprocessing to raise a more revelant error
8 # message: "No module named _multiprocessing". _multiprocessing is not compiled 8 # message: "No module named _multiprocessing". _multiprocessing is not compiled
9 # without thread support. 9 # without thread support.
10 test.support.import_module('threading') 10 test.support.import_module('threading')
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 87
88 88
89 class ThreadPoolMixin(ExecutorMixin): 89 class ThreadPoolMixin(ExecutorMixin):
90 executor_type = futures.ThreadPoolExecutor 90 executor_type = futures.ThreadPoolExecutor
91 91
92 92
93 class ProcessPoolMixin(ExecutorMixin): 93 class ProcessPoolMixin(ExecutorMixin):
94 executor_type = futures.ProcessPoolExecutor 94 executor_type = futures.ProcessPoolExecutor
95 95
96 96
97 class ExecutorShutdownTest(unittest.TestCase): 97 class ExecutorShutdownTest:
98 def test_run_after_shutdown(self): 98 def test_run_after_shutdown(self):
99 self.executor.shutdown() 99 self.executor.shutdown()
100 self.assertRaises(RuntimeError, 100 self.assertRaises(RuntimeError,
101 self.executor.submit, 101 self.executor.submit,
102 pow, 2, 5) 102 pow, 2, 5)
103 103
104 def test_interpreter_shutdown(self): 104 def test_interpreter_shutdown(self):
105 # Test the atexit hook for shutdown of worker threads and processes 105 # Test the atexit hook for shutdown of worker threads and processes
106 rc, out, err = assert_python_ok('-c', """if 1: 106 rc, out, err = assert_python_ok('-c', """if 1:
107 from concurrent.futures import {executor_type} 107 from concurrent.futures import {executor_type}
108 from time import sleep 108 from time import sleep
109 from test.test_concurrent_futures import sleep_and_print 109 from test.test_concurrent_futures import sleep_and_print
110 t = {executor_type}(5) 110 t = {executor_type}(5)
111 t.submit(sleep_and_print, 1.0, "apple") 111 t.submit(sleep_and_print, 1.0, "apple")
112 """.format(executor_type=self.executor_type.__name__)) 112 """.format(executor_type=self.executor_type.__name__))
113 # Errors in atexit hooks don't change the process exit code, check 113 # Errors in atexit hooks don't change the process exit code, check
114 # stderr manually. 114 # stderr manually.
115 self.assertFalse(err) 115 self.assertFalse(err)
116 self.assertEqual(out.strip(), b"apple") 116 self.assertEqual(out.strip(), b"apple")
117 117
118 def test_hang_issue12364(self): 118 def test_hang_issue12364(self):
119 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 119 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
120 self.executor.shutdown() 120 self.executor.shutdown()
121 for f in fs: 121 for f in fs:
122 f.result() 122 f.result()
123 123
124 124
125 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): 125 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.Tes tCase):
126 def _prime_executor(self): 126 def _prime_executor(self):
127 pass 127 pass
128 128
129 def test_threads_terminate(self): 129 def test_threads_terminate(self):
130 self.executor.submit(mul, 21, 2) 130 self.executor.submit(mul, 21, 2)
131 self.executor.submit(mul, 6, 7) 131 self.executor.submit(mul, 6, 7)
132 self.executor.submit(mul, 3, 14) 132 self.executor.submit(mul, 3, 14)
133 self.assertEqual(len(self.executor._threads), 3) 133 self.assertEqual(len(self.executor._threads), 3)
134 self.executor.shutdown() 134 self.executor.shutdown()
135 for t in self.executor._threads: 135 for t in self.executor._threads:
(...skipping 11 matching lines...) Expand all
147 def test_del_shutdown(self): 147 def test_del_shutdown(self):
148 executor = futures.ThreadPoolExecutor(max_workers=5) 148 executor = futures.ThreadPoolExecutor(max_workers=5)
149 executor.map(abs, range(-5, 5)) 149 executor.map(abs, range(-5, 5))
150 threads = executor._threads 150 threads = executor._threads
151 del executor 151 del executor
152 152
153 for t in threads: 153 for t in threads:
154 t.join() 154 t.join()
155 155
156 156
157 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): 157 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.T estCase):
158 def _prime_executor(self): 158 def _prime_executor(self):
159 pass 159 pass
160 160
161 def test_processes_terminate(self): 161 def test_processes_terminate(self):
162 self.executor.submit(mul, 21, 2) 162 self.executor.submit(mul, 21, 2)
163 self.executor.submit(mul, 6, 7) 163 self.executor.submit(mul, 6, 7)
164 self.executor.submit(mul, 3, 14) 164 self.executor.submit(mul, 3, 14)
165 self.assertEqual(len(self.executor._processes), 5) 165 self.assertEqual(len(self.executor._processes), 5)
166 processes = self.executor._processes 166 processes = self.executor._processes
167 self.executor.shutdown() 167 self.executor.shutdown()
(...skipping 15 matching lines...) Expand all
183 list(executor.map(abs, range(-5, 5))) 183 list(executor.map(abs, range(-5, 5)))
184 queue_management_thread = executor._queue_management_thread 184 queue_management_thread = executor._queue_management_thread
185 processes = executor._processes 185 processes = executor._processes
186 del executor 186 del executor
187 187
188 queue_management_thread.join() 188 queue_management_thread.join()
189 for p in processes.values(): 189 for p in processes.values():
190 p.join() 190 p.join()
191 191
192 192
193 class WaitTests(unittest.TestCase): 193 class WaitTests:
194 194
195 def test_first_completed(self): 195 def test_first_completed(self):
196 future1 = self.executor.submit(mul, 21, 2) 196 future1 = self.executor.submit(mul, 21, 2)
197 future2 = self.executor.submit(time.sleep, 1.5) 197 future2 = self.executor.submit(time.sleep, 1.5)
198 198
199 done, not_done = futures.wait( 199 done, not_done = futures.wait(
200 [CANCELLED_FUTURE, future1, future2], 200 [CANCELLED_FUTURE, future1, future2],
201 return_when=futures.FIRST_COMPLETED) 201 return_when=futures.FIRST_COMPLETED)
202 202
203 self.assertEqual(set([future1]), done) 203 self.assertEqual(set([future1]), done)
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
284 timeout=5, 284 timeout=5,
285 return_when=futures.ALL_COMPLETED) 285 return_when=futures.ALL_COMPLETED)
286 286
287 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 287 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
288 EXCEPTION_FUTURE, 288 EXCEPTION_FUTURE,
289 SUCCESSFUL_FUTURE, 289 SUCCESSFUL_FUTURE,
290 future1]), finished) 290 future1]), finished)
291 self.assertEqual(set([future2]), pending) 291 self.assertEqual(set([future2]), pending)
292 292
293 293
294 class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): 294 class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
295 295
296 def test_pending_calls_race(self): 296 def test_pending_calls_race(self):
297 # Issue #14406: multi-threaded race condition when waiting on all 297 # Issue #14406: multi-threaded race condition when waiting on all
298 # futures. 298 # futures.
299 event = threading.Event() 299 event = threading.Event()
300 def future_func(): 300 def future_func():
301 event.wait() 301 event.wait()
302 oldswitchinterval = sys.getswitchinterval() 302 oldswitchinterval = sys.getswitchinterval()
303 sys.setswitchinterval(1e-6) 303 sys.setswitchinterval(1e-6)
304 try: 304 try:
305 fs = {self.executor.submit(future_func) for i in range(100)} 305 fs = {self.executor.submit(future_func) for i in range(100)}
306 event.set() 306 event.set()
307 futures.wait(fs, return_when=futures.ALL_COMPLETED) 307 futures.wait(fs, return_when=futures.ALL_COMPLETED)
308 finally: 308 finally:
309 sys.setswitchinterval(oldswitchinterval) 309 sys.setswitchinterval(oldswitchinterval)
310 310
311 311
312 class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): 312 class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
313 pass 313 pass
314 314
315 315
316 class AsCompletedTests(unittest.TestCase): 316 class AsCompletedTests:
317 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 317 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
318 def test_no_timeout(self): 318 def test_no_timeout(self):
319 future1 = self.executor.submit(mul, 2, 21) 319 future1 = self.executor.submit(mul, 2, 21)
320 future2 = self.executor.submit(mul, 7, 6) 320 future2 = self.executor.submit(mul, 7, 6)
321 321
322 completed = set(futures.as_completed( 322 completed = set(futures.as_completed(
323 [CANCELLED_AND_NOTIFIED_FUTURE, 323 [CANCELLED_AND_NOTIFIED_FUTURE,
324 EXCEPTION_FUTURE, 324 EXCEPTION_FUTURE,
325 SUCCESSFUL_FUTURE, 325 SUCCESSFUL_FUTURE,
326 future1, future2])) 326 future1, future2]))
(...skipping 16 matching lines...) Expand all
343 timeout=0): 343 timeout=0):
344 completed_futures.add(future) 344 completed_futures.add(future)
345 except futures.TimeoutError: 345 except futures.TimeoutError:
346 pass 346 pass
347 347
348 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 348 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
349 EXCEPTION_FUTURE, 349 EXCEPTION_FUTURE,
350 SUCCESSFUL_FUTURE]), 350 SUCCESSFUL_FUTURE]),
351 completed_futures) 351 completed_futures)
352 352
353 353 def test_duplicate_futures(self):
354 class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): 354 # Issue 20367. Duplicate futures should not raise exceptions or give
355 # duplicate responses.
356 future1 = self.executor.submit(time.sleep, 2)
357 completed = [f for f in futures.as_completed([future1,future1])]
358 self.assertEqual(len(completed), 1)
359
360
361 class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.Tes tCase):
355 pass 362 pass
356 363
357 364
358 class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): 365 class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.T estCase):
359 pass 366 pass
360 367
361 368
362 class ExecutorTest(unittest.TestCase): 369 class ExecutorTest:
363 # Executor.shutdown() and context manager usage is tested by 370 # Executor.shutdown() and context manager usage is tested by
364 # ExecutorShutdownTest. 371 # ExecutorShutdownTest.
365 def test_submit(self): 372 def test_submit(self):
366 future = self.executor.submit(pow, 2, 8) 373 future = self.executor.submit(pow, 2, 8)
367 self.assertEqual(256, future.result()) 374 self.assertEqual(256, future.result())
368 375
369 def test_submit_keyword(self): 376 def test_submit_keyword(self):
370 future = self.executor.submit(mul, 2, y=8) 377 future = self.executor.submit(mul, 2, y=8)
371 self.assertEqual(16, future.result()) 378 self.assertEqual(16, future.result())
372 379
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
412 my_object, lambda obj: my_object_collected.set()) 419 my_object, lambda obj: my_object_collected.set())
413 # Deliberately discarding the future. 420 # Deliberately discarding the future.
414 self.executor.submit(my_object.my_method) 421 self.executor.submit(my_object.my_method)
415 del my_object 422 del my_object
416 423
417 collected = my_object_collected.wait(timeout=5.0) 424 collected = my_object_collected.wait(timeout=5.0)
418 self.assertTrue(collected, 425 self.assertTrue(collected,
419 "Stale reference not collected within timeout.") 426 "Stale reference not collected within timeout.")
420 427
421 428
422 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): 429 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
423 def test_map_submits_without_iteration(self): 430 def test_map_submits_without_iteration(self):
424 """Tests verifying issue 11777.""" 431 """Tests verifying issue 11777."""
425 finished = [] 432 finished = []
426 def record_finished(n): 433 def record_finished(n):
427 finished.append(n) 434 finished.append(n)
428 435
429 self.executor.map(record_finished, range(10)) 436 self.executor.map(record_finished, range(10))
430 self.executor.shutdown(wait=True) 437 self.executor.shutdown(wait=True)
431 self.assertCountEqual(finished, range(10)) 438 self.assertCountEqual(finished, range(10))
432 439
433 440
434 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): 441 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase) :
435 def test_killed_child(self): 442 def test_killed_child(self):
436 # When a child process is abruptly terminated, the whole pool gets 443 # When a child process is abruptly terminated, the whole pool gets
437 # "broken". 444 # "broken".
438 futures = [self.executor.submit(time.sleep, 3)] 445 futures = [self.executor.submit(time.sleep, 3)]
439 # Get one of the processes, and terminate (kill) it 446 # Get one of the processes, and terminate (kill) it
440 p = next(iter(self.executor._processes.values())) 447 p = next(iter(self.executor._processes.values()))
441 p.terminate() 448 p.terminate()
442 for fut in futures: 449 for fut in futures:
443 self.assertRaises(BrokenProcessPool, fut.result) 450 self.assertRaises(BrokenProcessPool, fut.result)
444 # Submitting other jobs fails as well. 451 # Submitting other jobs fails as well.
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
662 669
663 f1 = create_future(state=PENDING) 670 f1 = create_future(state=PENDING)
664 t = threading.Thread(target=notification) 671 t = threading.Thread(target=notification)
665 t.start() 672 t.start()
666 673
667 self.assertIsInstance(f1.exception(timeout=5), OSError) 674 self.assertIsInstance(f1.exception(timeout=5), OSError)
668 675
669 @test.support.reap_threads 676 @test.support.reap_threads
670 def test_main(): 677 def test_main():
671 try: 678 try:
672 test.support.run_unittest(ProcessPoolExecutorTest, 679 test.support.run_unittest(__name__)
673 ThreadPoolExecutorTest,
674 ProcessPoolWaitTests,
675 ThreadPoolWaitTests,
676 ProcessPoolAsCompletedTests,
677 ThreadPoolAsCompletedTests,
678 FutureTests,
679 ProcessPoolShutdownTest,
680 ThreadPoolShutdownTest,
681 )
682 finally: 680 finally:
683 test.support.reap_children() 681 test.support.reap_children()
684 682
685 if __name__ == "__main__": 683 if __name__ == "__main__":
686 test_main() 684 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+