Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(11)

Side by Side Diff: Lib/multiprocessing/queues.py

Issue 10639: reindent.py converts newlines to platform default
Patch Set: Created 8 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/multiprocessing/process.py ('k') | Lib/multiprocessing/util.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # 1 #
2 # Module implementing queues 2 # Module implementing queues
3 # 3 #
4 # multiprocessing/queues.py 4 # multiprocessing/queues.py
5 # 5 #
6 # Copyright (c) 2006-2008, R Oudkerk 6 # Copyright (c) 2006-2008, R Oudkerk
7 # All rights reserved. 7 # All rights reserved.
8 # 8 #
9 # Redistribution and use in source and binary forms, with or without 9 # Redistribution and use in source and binary forms, with or without
10 # modification, are permitted provided that the following conditions 10 # modification, are permitted provided that the following conditions
(...skipping 23 matching lines...) Expand all
34 34
35 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 35 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
36 36
37 import sys 37 import sys
38 import os 38 import os
39 import threading 39 import threading
40 import collections 40 import collections
41 import time 41 import time
42 import atexit 42 import atexit
43 import weakref 43 import weakref
44 import errno
45 44
46 from queue import Empty, Full 45 from queue import Empty, Full
47 import _multiprocessing 46 import _multiprocessing
48 from multiprocessing.connection import Pipe, SentinelReady 47 from multiprocessing.connection import Pipe, SentinelReady
49 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condi tion 48 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condi tion
50 from multiprocessing.util import debug, info, Finalize, register_after_fork 49 from multiprocessing.util import debug, info, Finalize, register_after_fork
51 from multiprocessing.forking import assert_spawning 50 from multiprocessing.forking import assert_spawning
52 51
53 # 52 #
54 # Queue type using a pipe, buffer and thread 53 # Queue type using a pipe, buffer and thread
55 # 54 #
56 55
57 class Queue(object): 56 class Queue(object):
58 57
59 def __init__(self, maxsize=0): 58 def __init__(self, maxsize=0):
60 if maxsize <= 0: 59 if maxsize <= 0:
61 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 60 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
62 self._maxsize = maxsize 61 self._maxsize = maxsize
63 self._reader, self._writer = Pipe(duplex=False) 62 self._reader, self._writer = Pipe(duplex=False)
64 self._rlock = Lock() 63 self._rlock = Lock()
65 self._opid = os.getpid() 64 self._opid = os.getpid()
66 if sys.platform == 'win32': 65 if sys.platform == 'win32':
67 self._wlock = None 66 self._wlock = None
68 else: 67 else:
69 self._wlock = Lock() 68 self._wlock = Lock()
70 self._sem = BoundedSemaphore(maxsize) 69 self._sem = BoundedSemaphore(maxsize)
71 # For use by concurrent.futures
72 self._ignore_epipe = False
73 70
74 self._after_fork() 71 self._after_fork()
75 72
76 if sys.platform != 'win32': 73 if sys.platform != 'win32':
77 register_after_fork(self, Queue._after_fork) 74 register_after_fork(self, Queue._after_fork)
78 75
79 def __getstate__(self): 76 def __getstate__(self):
80 assert_spawning(self) 77 assert_spawning(self)
81 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 78 return (self._maxsize, self._reader, self._writer,
82 self._rlock, self._wlock, self._sem, self._opid) 79 self._rlock, self._wlock, self._sem, self._opid)
83 80
84 def __setstate__(self, state): 81 def __setstate__(self, state):
85 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 82 (self._maxsize, self._reader, self._writer,
86 self._rlock, self._wlock, self._sem, self._opid) = state 83 self._rlock, self._wlock, self._sem, self._opid) = state
87 self._after_fork() 84 self._after_fork()
88 85
89 def _after_fork(self): 86 def _after_fork(self):
90 debug('Queue._after_fork()') 87 debug('Queue._after_fork()')
91 self._notempty = threading.Condition(threading.Lock()) 88 self._notempty = threading.Condition(threading.Lock())
92 self._buffer = collections.deque() 89 self._buffer = collections.deque()
93 self._thread = None 90 self._thread = None
94 self._jointhread = None 91 self._jointhread = None
95 self._joincancelled = False 92 self._joincancelled = False
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 pass 171 pass
175 172
176 def _start_thread(self): 173 def _start_thread(self):
177 debug('Queue._start_thread()') 174 debug('Queue._start_thread()')
178 175
179 # Start thread which transfers data from buffer to pipe 176 # Start thread which transfers data from buffer to pipe
180 self._buffer.clear() 177 self._buffer.clear()
181 self._thread = threading.Thread( 178 self._thread = threading.Thread(
182 target=Queue._feed, 179 target=Queue._feed,
183 args=(self._buffer, self._notempty, self._send, 180 args=(self._buffer, self._notempty, self._send,
184 self._wlock, self._writer.close, self._ignore_epipe), 181 self._wlock, self._writer.close),
185 name='QueueFeederThread' 182 name='QueueFeederThread'
186 ) 183 )
187 self._thread.daemon = True 184 self._thread.daemon = True
188 185
189 debug('doing self._thread.start()') 186 debug('doing self._thread.start()')
190 self._thread.start() 187 self._thread.start()
191 debug('... done self._thread.start()') 188 debug('... done self._thread.start()')
192 189
193 # On process exit we will wait for data to be flushed to pipe. 190 # On process exit we will wait for data to be flushed to pipe.
194 # 191 #
(...skipping 30 matching lines...) Expand all
225 def _finalize_close(buffer, notempty): 222 def _finalize_close(buffer, notempty):
226 debug('telling queue thread to quit') 223 debug('telling queue thread to quit')
227 notempty.acquire() 224 notempty.acquire()
228 try: 225 try:
229 buffer.append(_sentinel) 226 buffer.append(_sentinel)
230 notempty.notify() 227 notempty.notify()
231 finally: 228 finally:
232 notempty.release() 229 notempty.release()
233 230
234 @staticmethod 231 @staticmethod
235 def _feed(buffer, notempty, send, writelock, close, ignore_epipe): 232 def _feed(buffer, notempty, send, writelock, close):
236 debug('starting thread to feed data to pipe') 233 debug('starting thread to feed data to pipe')
237 from .util import is_exiting 234 from .util import is_exiting
238 235
239 nacquire = notempty.acquire 236 nacquire = notempty.acquire
240 nrelease = notempty.release 237 nrelease = notempty.release
241 nwait = notempty.wait 238 nwait = notempty.wait
242 bpopleft = buffer.popleft 239 bpopleft = buffer.popleft
243 sentinel = _sentinel 240 sentinel = _sentinel
244 if sys.platform != 'win32': 241 if sys.platform != 'win32':
245 wacquire = writelock.acquire 242 wacquire = writelock.acquire
(...skipping 21 matching lines...) Expand all
267 send(obj) 264 send(obj)
268 else: 265 else:
269 wacquire() 266 wacquire()
270 try: 267 try:
271 send(obj) 268 send(obj)
272 finally: 269 finally:
273 wrelease() 270 wrelease()
274 except IndexError: 271 except IndexError:
275 pass 272 pass
276 except Exception as e: 273 except Exception as e:
277 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
278 return
279 # Since this runs in a daemon thread the resources it uses 274 # Since this runs in a daemon thread the resources it uses
280 # may be become unusable while the process is cleaning up. 275 # may be become unusable while the process is cleaning up.
281 # We ignore errors which happen after the process has 276 # We ignore errors which happen after the process has
282 # started to cleanup. 277 # started to cleanup.
283 try: 278 try:
284 if is_exiting(): 279 if is_exiting():
285 info('error in queue thread: %s', e) 280 info('error in queue thread: %s', e)
286 else: 281 else:
287 import traceback 282 import traceback
288 traceback.print_exc() 283 traceback.print_exc()
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
391 else: 386 else:
392 send = self._writer.send 387 send = self._writer.send
393 wacquire, wrelease = self._wlock.acquire, self._wlock.release 388 wacquire, wrelease = self._wlock.acquire, self._wlock.release
394 def put(obj): 389 def put(obj):
395 wacquire() 390 wacquire()
396 try: 391 try:
397 return send(obj) 392 return send(obj)
398 finally: 393 finally:
399 wrelease() 394 wrelease()
400 self.put = put 395 self.put = put
OLDNEW
« no previous file with comments | « Lib/multiprocessing/process.py ('k') | Lib/multiprocessing/util.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+