diff -r 3f72b3a96508 Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Sat Oct 20 13:16:49 2012 +0100 +++ b/Lib/concurrent/futures/process.py Sat Oct 20 13:19:18 2012 +0100 @@ -240,6 +240,7 @@ "terminated abruptly while the future was " "running or pending." )) + del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. @@ -264,6 +265,7 @@ work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + del work_item # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: diff -r 3f72b3a96508 Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Sat Oct 20 13:16:49 2012 +0100 +++ b/Lib/concurrent/futures/thread.py Sat Oct 20 13:19:18 2012 +0100 @@ -63,6 +63,7 @@ work_item = work_queue.get(block=True) if work_item is not None: work_item.run() + del work_item continue executor = executor_reference() # Exit if: diff -r 3f72b3a96508 Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py Sat Oct 20 13:16:49 2012 +0100 +++ b/Lib/multiprocessing/queues.py Sat Oct 20 13:19:18 2012 +0100 @@ -243,10 +243,12 @@ if wacquire is None: send(obj) + del obj else: wacquire() try: send(obj) + del obj finally: wrelease() except IndexError: diff -r 3f72b3a96508 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Sat Oct 20 13:16:49 2012 +0100 +++ b/Lib/test/test_concurrent_futures.py Sat Oct 20 13:19:18 2012 +0100 @@ -15,6 +15,7 @@ import threading import time import unittest +import weakref from concurrent import futures from concurrent.futures._base import ( @@ -52,6 +53,11 @@ sys.stdout.flush() +class MyObject(object): + def my_method(self): + pass + + class ExecutorMixin: worker_count = 5 @@ -396,6 +402,21 @@ self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=5.0) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self):