import queue from multiprocessing import Process, Queue from threading import Thread from time import sleep def main(): # You don't really need two queues to show the problem # If you comment out in_queue.get/put, the failure still happens, just less neatly # Loads of items get placed on out_queue so that might cause other problems # and you also need to comment out `assert out_queue.qsize() == i` below in_queue = Queue() out_queue = Queue() # Setting target=worker_loop (i.e. not using a Thread within a Process) # fixes the problem, but isn't an option for my real use case # Besides, are sub-processes not allowed to start threads? process = Process(target=worker_loop_in_thread, args=(in_queue, out_queue)) process.start() # Uncommenting this sleep fixes the problem # I'm doing something similar to fix my real program which feels very bad # sleep(0.01) # The failure always first happens in the second out_queue.get # (see `assert i > 0` below) # so range(2) is enough to reproduce, but this shows the consistency once failure starts for i in range(5): in_queue.put(1) try: assert out_queue.get(timeout=1) == 2 except queue.Empty: print("Fail!", out_queue.qsize()) # qsize seems to be perfectly accurate here - it gives the number # of items that I expect to be on the queue, but .get doesn't see them assert out_queue.qsize() == i assert i > 0 # Like .get, .empty doesn't see the items that were placed there assert out_queue.empty() # Just showing that the problem isn't caused by the process dying assert process.is_alive() process.terminate() def worker_loop_in_thread(*args): Thread(target=worker_loop, args=args).start() def worker_loop(in_queue, out_queue): while True: assert in_queue.get() == 1 out_queue.put(2) if __name__ == '__main__': # Most of the time main() works fine # This loop is required to show it breaking for attempt in range(1000): print(attempt) main()