Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2)

Side by Side Diff: Lib/multiprocessing/pool.py

Issue 10332: Multiprocessing maxtasksperchild results in hang
Patch Set: Created 8 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | Lib/test/test_multiprocessing.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # 1 #
2 # Module providing the `Pool` class for managing a process pool 2 # Module providing the `Pool` class for managing a process pool
3 # 3 #
4 # multiprocessing/pool.py 4 # multiprocessing/pool.py
5 # 5 #
6 # Copyright (c) 2006-2008, R Oudkerk 6 # Copyright (c) 2006-2008, R Oudkerk
7 # All rights reserved. 7 # All rights reserved.
8 # 8 #
9 # Redistribution and use in source and binary forms, with or without 9 # Redistribution and use in source and binary forms, with or without
10 # modification, are permitted provided that the following conditions 10 # modification, are permitted provided that the following conditions
(...skipping 303 matching lines...) Expand 10 before | Expand all | Expand 10 after
314 314
315 task_batches = Pool._get_tasks(func, iterable, chunksize) 315 task_batches = Pool._get_tasks(func, iterable, chunksize)
316 result = MapResult(self._cache, chunksize, len(iterable), callback, 316 result = MapResult(self._cache, chunksize, len(iterable), callback,
317 error_callback=error_callback) 317 error_callback=error_callback)
318 self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 318 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
319 for i, x in enumerate(task_batches)), None)) 319 for i, x in enumerate(task_batches)), None))
320 return result 320 return result
321 321
322 @staticmethod 322 @staticmethod
323 def _handle_workers(pool): 323 def _handle_workers(pool):
324 while pool._worker_handler._state == RUN and pool._state == RUN: 324 thread = threading.current_thread()
325
326 # Keep maintaining workers until the cache gets drained, unless the pool
327 # is terminated.
328 while thread._state == RUN or (pool._cache and thread._state != TERMINAT E):
325 pool._maintain_pool() 329 pool._maintain_pool()
326 time.sleep(0.1) 330 time.sleep(0.1)
327 # send sentinel to stop workers 331 # send sentinel to stop workers
328 pool._taskqueue.put(None) 332 pool._taskqueue.put(None)
329 debug('worker handler exiting') 333 debug('worker handler exiting')
330 334
331 @staticmethod 335 @staticmethod
332 def _handle_tasks(taskqueue, put, outqueue, pool): 336 def _handle_tasks(taskqueue, put, outqueue, pool):
333 thread = threading.current_thread() 337 thread = threading.current_thread()
334 338
(...skipping 386 matching lines...) Expand 10 before | Expand all | Expand 10 after
721 @staticmethod 725 @staticmethod
722 def _help_stuff_finish(inqueue, task_handler, size): 726 def _help_stuff_finish(inqueue, task_handler, size):
723 # put sentinels at head of inqueue to make workers finish 727 # put sentinels at head of inqueue to make workers finish
724 inqueue.not_empty.acquire() 728 inqueue.not_empty.acquire()
725 try: 729 try:
726 inqueue.queue.clear() 730 inqueue.queue.clear()
727 inqueue.queue.extend([None] * size) 731 inqueue.queue.extend([None] * size)
728 inqueue.not_empty.notify_all() 732 inqueue.not_empty.notify_all()
729 finally: 733 finally:
730 inqueue.not_empty.release() 734 inqueue.not_empty.release()
OLDNEW
« no previous file with comments | « no previous file | Lib/test/test_multiprocessing.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+