Index: Lib/concurrent/futures/process.py =================================================================== --- Lib/concurrent/futures/process.py (Revision 87576) +++ Lib/concurrent/futures/process.py (Arbeitskopie) @@ -244,6 +244,31 @@ else: work_item.future.set_result(result_item.result) +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -253,6 +278,7 @@ execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ + _check_system_limits() _remove_dead_thread_references() if max_workers is None: Index: Lib/test/test_concurrent_futures.py =================================================================== --- Lib/test/test_concurrent_futures.py (Revision 87576) +++ Lib/test/test_concurrent_futures.py (Arbeitskopie) @@ -69,7 +69,7 @@ assert handle is not None return handle else: - event = multiprocessing.Event() + event = self.Event[0]() self.CALL_LOCKS[id(event)] = event return id(event) @@ -99,7 +99,8 @@ else: self.CALL_LOCKS[handle].set() - def __init__(self, manual_finish=False, result=42): + def __init__(self, Event, manual_finish=False, result=42): + self.Event = Event self._called_event = self._create_event() self._can_finish = self._create_event() @@ -138,8 +139,8 @@ raise ZeroDivisionError() class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) + def __init__(self, Event, result=42): + super().__init__(Event, manual_finish=True, result=result) def __call__(self, manual_finish): if manual_finish: @@ -155,9 +156,9 @@ def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) try: self.executor.submit(call1) @@ -176,13 +177,28 @@ call2.close() call3.close() -class ThreadPoolShutdownTest(ExecutorShutdownTest): +class ThreadPoolMixin: + # wrap in tuple to prevent creation of instance methods + Event = (threading.Event,) def setUp(self): self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): self.executor.shutdown(wait=True) +class ProcessPoolMixin: + # wrap in tuple to prevent creation of instance methods + Event = (multiprocessing.Event,) + def setUp(self): + try: + self.executor = futures.ProcessPoolExecutor(max_workers=5) + except NotImplementedError as e: + self.skipTest(str(e)) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): def test_threads_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._threads), 3) @@ -208,13 +224,7 @@ for t in threads: t.join() -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - +class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) @@ -251,8 +261,8 @@ pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -270,7 +280,7 @@ call2.close() def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) @@ -290,9 +300,9 @@ call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = ExceptionCall(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -317,8 +327,8 @@ pass call1.set_can() - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = ExceptionCall(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -343,7 +353,7 @@ call2.close() def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) @@ -363,8 +373,8 @@ call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -397,10 +407,10 @@ 'this test assumes that future4 will be cancelled before it is ' 'queued to run - which might not be the case if ' 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) + call4 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -432,8 +442,8 @@ pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -460,20 +470,12 @@ call2.close() -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): + pass - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): + pass -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - class AsCompletedTests(unittest.TestCase): # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. def test_no_timeout(self): @@ -483,8 +485,8 @@ call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -507,7 +509,7 @@ call2.close() def test_zero_timeout(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) completed_futures = set() @@ -529,20 +531,12 @@ finally: call1.close() -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) +class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): + pass - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): + pass -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - class ExecutorTest(unittest.TestCase): # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. @@ -567,7 +561,7 @@ def test_map_timeout(self): results = [] - timeout_call = MapCall() + timeout_call = MapCall(self.Event) try: try: for i in self.executor.map(timeout_call, @@ -583,20 +577,12 @@ self.assertEqual([42, 42], results) -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): + pass - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): + pass -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - class FutureTests(unittest.TestCase): def test_done_callback_with_result(self): callback_result = None