Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2276)

Side by Side Diff: Lib/asyncio/selector_events.py

Issue 21998: asyncio: a new self-pipe should be created in the child process after fork
Patch Set: Created 4 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/asyncio/proactor_events.py ('k') | Lib/asyncio/unix_events.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 """Event loop using a selector and related classes. 1 """Event loop using a selector and related classes.
2 2
3 A selector is a "notify-when-ready" multiplexer. For a subclass which 3 A selector is a "notify-when-ready" multiplexer. For a subclass which
4 also includes support for signal handling, see the unix_events sub-module. 4 also includes support for signal handling, see the unix_events sub-module.
5 """ 5 """
6 6
7 __all__ = ['BaseSelectorEventLoop'] 7 __all__ = ['BaseSelectorEventLoop']
8 8
9 import collections 9 import collections
10 import errno 10 import errno
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 def _make_datagram_transport(self, sock, protocol, 87 def _make_datagram_transport(self, sock, protocol,
88 address=None, waiter=None, extra=None): 88 address=None, waiter=None, extra=None):
89 return _SelectorDatagramTransport(self, sock, protocol, 89 return _SelectorDatagramTransport(self, sock, protocol,
90 address, waiter, extra) 90 address, waiter, extra)
91 91
92 def close(self): 92 def close(self):
93 if self.is_running(): 93 if self.is_running():
94 raise RuntimeError("Cannot close a running event loop") 94 raise RuntimeError("Cannot close a running event loop")
95 if self.is_closed(): 95 if self.is_closed():
96 return 96 return
97 # detect fork before using the self-pipe or the selector
98 self._detect_fork()
97 self._close_self_pipe() 99 self._close_self_pipe()
98 super().close() 100 super().close()
99 if self._selector is not None: 101 if self._selector is not None:
100 self._selector.close() 102 self._selector.close()
101 self._selector = None 103 self._selector = None
102 104
103 def _socketpair(self): 105 def _socketpair(self):
104 raise NotImplementedError 106 raise NotImplementedError
105 107
106 def _close_self_pipe(self): 108 def _close_self_pipe(self):
(...skipping 21 matching lines...) Expand all
128 data = self._ssock.recv(4096) 130 data = self._ssock.recv(4096)
129 if not data: 131 if not data:
130 break 132 break
131 self._process_self_data(data) 133 self._process_self_data(data)
132 except InterruptedError: 134 except InterruptedError:
133 continue 135 continue
134 except BlockingIOError: 136 except BlockingIOError:
135 break 137 break
136 138
137 def _write_to_self(self): 139 def _write_to_self(self):
140 # detect fork before using the self-pipe
141 self._detect_fork()
142
138 # This may be called from a different thread, possibly after 143 # This may be called from a different thread, possibly after
139 # _close_self_pipe() has been called or even while it is 144 # _close_self_pipe() has been called or even while it is
140 # running. Guard for self._csock being None or closed. When 145 # running. Guard for self._csock being None or closed. When
141 # a socket is closed, send() raises OSError (with errno set to 146 # a socket is closed, send() raises OSError (with errno set to
142 # EBADF, but let's not rely on the exact error code). 147 # EBADF, but let's not rely on the exact error code).
143 csock = self._csock 148 csock = self._csock
144 if csock is not None: 149 if csock is not None:
145 try: 150 try:
146 csock.send(b'\0') 151 csock.send(b'\0')
147 except OSError: 152 except OSError:
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
223 if protocol is not None: 228 if protocol is not None:
224 context['protocol'] = protocol 229 context['protocol'] = protocol
225 if transport is not None: 230 if transport is not None:
226 context['transport'] = transport 231 context['transport'] = transport
227 self.call_exception_handler(context) 232 self.call_exception_handler(context)
228 233
229 def add_reader(self, fd, callback, *args): 234 def add_reader(self, fd, callback, *args):
230 """Add a reader callback.""" 235 """Add a reader callback."""
231 self._check_closed() 236 self._check_closed()
232 handle = events.Handle(callback, args, self) 237 handle = events.Handle(callback, args, self)
238 # detect fork before using the selector
239 self._detect_fork()
233 try: 240 try:
234 key = self._selector.get_key(fd) 241 key = self._selector.get_key(fd)
235 except KeyError: 242 except KeyError:
236 self._selector.register(fd, selectors.EVENT_READ, 243 self._selector.register(fd, selectors.EVENT_READ,
237 (handle, None)) 244 (handle, None))
238 else: 245 else:
239 mask, (reader, writer) = key.events, key.data 246 mask, (reader, writer) = key.events, key.data
240 self._selector.modify(fd, mask | selectors.EVENT_READ, 247 self._selector.modify(fd, mask | selectors.EVENT_READ,
241 (handle, writer)) 248 (handle, writer))
242 if reader is not None: 249 if reader is not None:
243 reader.cancel() 250 reader.cancel()
244 251
245 def remove_reader(self, fd): 252 def remove_reader(self, fd):
246 """Remove a reader callback.""" 253 """Remove a reader callback."""
247 if self.is_closed(): 254 if self.is_closed():
248 return False 255 return False
256 # detect fork before using the selector
257 self._detect_fork()
249 try: 258 try:
250 key = self._selector.get_key(fd) 259 key = self._selector.get_key(fd)
251 except KeyError: 260 except KeyError:
252 return False 261 return False
253 else: 262 else:
254 mask, (reader, writer) = key.events, key.data 263 mask, (reader, writer) = key.events, key.data
255 mask &= ~selectors.EVENT_READ 264 mask &= ~selectors.EVENT_READ
256 if not mask: 265 if not mask:
257 self._selector.unregister(fd) 266 self._selector.unregister(fd)
258 else: 267 else:
259 self._selector.modify(fd, mask, (None, writer)) 268 self._selector.modify(fd, mask, (None, writer))
260 269
261 if reader is not None: 270 if reader is not None:
262 reader.cancel() 271 reader.cancel()
263 return True 272 return True
264 else: 273 else:
265 return False 274 return False
266 275
267 def add_writer(self, fd, callback, *args): 276 def add_writer(self, fd, callback, *args):
268 """Add a writer callback..""" 277 """Add a writer callback.."""
269 self._check_closed() 278 self._check_closed()
270 handle = events.Handle(callback, args, self) 279 handle = events.Handle(callback, args, self)
280 # detect fork before using the selector
281 self._detect_fork()
271 try: 282 try:
272 key = self._selector.get_key(fd) 283 key = self._selector.get_key(fd)
273 except KeyError: 284 except KeyError:
274 self._selector.register(fd, selectors.EVENT_WRITE, 285 self._selector.register(fd, selectors.EVENT_WRITE,
275 (None, handle)) 286 (None, handle))
276 else: 287 else:
277 mask, (reader, writer) = key.events, key.data 288 mask, (reader, writer) = key.events, key.data
278 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 289 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
279 (reader, handle)) 290 (reader, handle))
280 if writer is not None: 291 if writer is not None:
281 writer.cancel() 292 writer.cancel()
282 293
283 def remove_writer(self, fd): 294 def remove_writer(self, fd):
284 """Remove a writer callback.""" 295 """Remove a writer callback."""
285 if self.is_closed(): 296 if self.is_closed():
286 return False 297 return False
298 # detect fork before using the selector
299 self._detect_fork()
287 try: 300 try:
288 key = self._selector.get_key(fd) 301 key = self._selector.get_key(fd)
289 except KeyError: 302 except KeyError:
290 return False 303 return False
291 else: 304 else:
292 mask, (reader, writer) = key.events, key.data 305 mask, (reader, writer) = key.events, key.data
293 # Remove both writer and connector. 306 # Remove both writer and connector.
294 mask &= ~selectors.EVENT_WRITE 307 mask &= ~selectors.EVENT_WRITE
295 if not mask: 308 if not mask:
296 self._selector.unregister(fd) 309 self._selector.unregister(fd)
(...skipping 764 matching lines...) Expand 10 before | Expand all | Expand 10 after
1061 except Exception as exc: 1074 except Exception as exc:
1062 self._fatal_error(exc, 1075 self._fatal_error(exc,
1063 'Fatal write error on datagram transport') 1076 'Fatal write error on datagram transport')
1064 return 1077 return
1065 1078
1066 self._maybe_resume_protocol() # May append to buffer. 1079 self._maybe_resume_protocol() # May append to buffer.
1067 if not self._buffer: 1080 if not self._buffer:
1068 self._loop.remove_writer(self._sock_fd) 1081 self._loop.remove_writer(self._sock_fd)
1069 if self._closing: 1082 if self._closing:
1070 self._call_connection_lost(None) 1083 self._call_connection_lost(None)
OLDNEW
« no previous file with comments | « Lib/asyncio/proactor_events.py ('k') | Lib/asyncio/unix_events.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+