diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -240,6 +240,7 @@ "terminated abruptly while the future was " "running or pending." )) + del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. @@ -264,6 +265,7 @@ work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + del work_item # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -63,6 +63,7 @@ work_item = work_queue.get(block=True) if work_item is not None: work_item.run() + del work_item continue executor = executor_reference() # Exit if: diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -243,10 +243,12 @@ if wacquire is None: send(obj) + del obj else: wacquire() try: send(obj) + del obj finally: wrelease() except IndexError: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -15,6 +15,7 @@ import threading import time import unittest +import weakref from concurrent import futures from concurrent.futures._base import ( @@ -52,6 +53,11 @@ sys.stdout.flush() +class MyObject(object): + def my_method(self): + pass + + class ExecutorMixin: worker_count = 5 @@ -396,6 +402,22 @@ self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() + @test.support.cpython_only + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=5.0) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self):