Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(262519)

Side by Side Diff: Lib/selectors.py

Issue 21998: asyncio: a new self-pipe should be created in the child process after fork
Patch Set: Created 4 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/asyncio/unix_events.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 """Selectors module. 1 """Selectors module.
2 2
3 This module allows high-level and efficient I/O multiplexing, built upon the 3 This module allows high-level and efficient I/O multiplexing, built upon the
4 `select` module primitives. 4 `select` module primitives.
5 """ 5 """
6 6
7 7
8 from abc import ABCMeta, abstractmethod 8 from abc import ABCMeta, abstractmethod
9 from collections import namedtuple, Mapping 9 from collections import namedtuple, Mapping
10 import math 10 import math
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 @abstractmethod 184 @abstractmethod
185 def get_map(self): 185 def get_map(self):
186 """Return a mapping of file objects to selector keys.""" 186 """Return a mapping of file objects to selector keys."""
187 raise NotImplementedError 187 raise NotImplementedError
188 188
189 def __enter__(self): 189 def __enter__(self):
190 return self 190 return self
191 191
192 def __exit__(self, *args): 192 def __exit__(self, *args):
193 self.close() 193 self.close()
194
195 def _at_fork(self):
196 pass
194 197
195 198
196 class _BaseSelectorImpl(BaseSelector): 199 class _BaseSelectorImpl(BaseSelector):
197 """Base selector implementation.""" 200 """Base selector implementation."""
198 201
199 def __init__(self): 202 def __init__(self):
200 # this maps file descriptors to keys 203 # this maps file descriptors to keys
201 self._fd_to_key = {} 204 self._fd_to_key = {}
202 # read-only mapping returned by get_map() 205 # read-only mapping returned by get_map()
203 self._map = _SelectorMapping(self) 206 self._map = _SelectorMapping(self)
204 207
205 def _fileobj_lookup(self, fileobj): 208 def _fileobj_lookup(self, fileobj):
206 """Return a file descriptor from a file object. 209 """Return a file descriptor from a file object.
207 210
208 This wraps _fileobj_to_fd() to do an exhaustive search in case 211 This wraps _fileobj_to_fd() to do an exhaustive search in case
209 the object is invalid but we still have it in our map. This 212 the object is invalid but we still have it in our map. This
210 is used by unregister() so we can unregister an object that 213 is used by unregister() so we can unregister an object that
211 was previously registered even if it is closed. It is also 214 was previously registered even if it is closed. It is also
212 used by _SelectorMapping. 215 used by _SelectorMapping.
213 """ 216 """
214 try: 217 try:
215 return _fileobj_to_fd(fileobj) 218 return _fileobj_to_fd(fileobj)
216 except ValueError: 219 except ValueError:
217 # Do an exhaustive search. 220 # Do an exhaustive search.
218 for key in self._fd_to_key.values(): 221 for key in self._fd_to_key.values():
219 if key.fileobj is fileobj: 222 if key.fileobj is fileobj:
220 return key.fd 223 return key.fd
221 # Raise ValueError after all. 224 # Raise ValueError after all.
222 raise 225 raise
226
227 def _register(self, key):
228 pass
223 229
224 def register(self, fileobj, events, data=None): 230 def register(self, fileobj, events, data=None):
225 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 231 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
226 raise ValueError("Invalid events: {!r}".format(events)) 232 raise ValueError("Invalid events: {!r}".format(events))
227 233
228 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 234 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
229 235
230 if key.fd in self._fd_to_key: 236 if key.fd in self._fd_to_key:
231 raise KeyError("{!r} (FD {}) is already registered" 237 raise KeyError("{!r} (FD {}) is already registered"
232 .format(fileobj, key.fd)) 238 .format(fileobj, key.fd))
233 239
240 self._register(key)
234 self._fd_to_key[key.fd] = key 241 self._fd_to_key[key.fd] = key
235 return key 242 return key
243
244 def _unregister(self, key):
245 pass
236 246
237 def unregister(self, fileobj): 247 def unregister(self, fileobj):
238 try: 248 try:
239 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 249 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
240 except KeyError: 250 except KeyError:
241 raise KeyError("{!r} is not registered".format(fileobj)) from None 251 raise KeyError("{!r} is not registered".format(fileobj)) from None
252 self._unregister(key)
242 return key 253 return key
243 254
244 def modify(self, fileobj, events, data=None): 255 def modify(self, fileobj, events, data=None):
245 # TODO: Subclasses can probably optimize this even further. 256 # TODO: Subclasses can probably optimize this even further.
246 try: 257 try:
247 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 258 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
248 except KeyError: 259 except KeyError:
249 raise KeyError("{!r} is not registered".format(fileobj)) from None 260 raise KeyError("{!r} is not registered".format(fileobj)) from None
250 if events != key.events: 261 if events != key.events:
251 self.unregister(fileobj) 262 self.unregister(fileobj)
(...skipping 27 matching lines...) Expand all
279 290
280 291
281 class SelectSelector(_BaseSelectorImpl): 292 class SelectSelector(_BaseSelectorImpl):
282 """Select-based selector.""" 293 """Select-based selector."""
283 294
284 def __init__(self): 295 def __init__(self):
285 super().__init__() 296 super().__init__()
286 self._readers = set() 297 self._readers = set()
287 self._writers = set() 298 self._writers = set()
288 299
289 def register(self, fileobj, events, data=None): 300 def _register(self, key):
290 key = super().register(fileobj, events, data) 301 if key.events & EVENT_READ:
291 if events & EVENT_READ:
292 self._readers.add(key.fd) 302 self._readers.add(key.fd)
293 if events & EVENT_WRITE: 303 if key.events & EVENT_WRITE:
294 self._writers.add(key.fd) 304 self._writers.add(key.fd)
295 return key
296 305
297 def unregister(self, fileobj): 306 def _unregister(self, key):
298 key = super().unregister(fileobj)
299 self._readers.discard(key.fd) 307 self._readers.discard(key.fd)
300 self._writers.discard(key.fd) 308 self._writers.discard(key.fd)
301 return key
302 309
303 if sys.platform == 'win32': 310 if sys.platform == 'win32':
304 def _select(self, r, w, _, timeout=None): 311 def _select(self, r, w, _, timeout=None):
305 r, w, x = select.select(r, w, w, timeout) 312 r, w, x = select.select(r, w, w, timeout)
306 return r, w + x, [] 313 return r, w + x, []
307 else: 314 else:
308 _select = select.select 315 _select = select.select
309 316
310 def select(self, timeout=None): 317 def select(self, timeout=None):
311 timeout = None if timeout is None else max(timeout, 0) 318 timeout = None if timeout is None else max(timeout, 0)
(...skipping 19 matching lines...) Expand all
331 338
332 if hasattr(select, 'poll'): 339 if hasattr(select, 'poll'):
333 340
334 class PollSelector(_BaseSelectorImpl): 341 class PollSelector(_BaseSelectorImpl):
335 """Poll-based selector.""" 342 """Poll-based selector."""
336 343
337 def __init__(self): 344 def __init__(self):
338 super().__init__() 345 super().__init__()
339 self._poll = select.poll() 346 self._poll = select.poll()
340 347
341 def register(self, fileobj, events, data=None): 348 def _register(self, key):
342 key = super().register(fileobj, events, data)
343 poll_events = 0 349 poll_events = 0
344 if events & EVENT_READ: 350 if key.events & EVENT_READ:
345 poll_events |= select.POLLIN 351 poll_events |= select.POLLIN
346 if events & EVENT_WRITE: 352 if key.events & EVENT_WRITE:
347 poll_events |= select.POLLOUT 353 poll_events |= select.POLLOUT
348 self._poll.register(key.fd, poll_events) 354 self._poll.register(key.fd, poll_events)
349 return key
350 355
351 def unregister(self, fileobj): 356 def _unregister(self, key):
352 key = super().unregister(fileobj)
353 self._poll.unregister(key.fd) 357 self._poll.unregister(key.fd)
354 return key
355 358
356 def select(self, timeout=None): 359 def select(self, timeout=None):
357 if timeout is None: 360 if timeout is None:
358 timeout = None 361 timeout = None
359 elif timeout <= 0: 362 elif timeout <= 0:
360 timeout = 0 363 timeout = 0
361 else: 364 else:
362 # poll() has a resolution of 1 millisecond, round away from 365 # poll() has a resolution of 1 millisecond, round away from
363 # zero to wait *at least* timeout seconds. 366 # zero to wait *at least* timeout seconds.
364 timeout = math.ceil(timeout * 1e3) 367 timeout = math.ceil(timeout * 1e3)
(...skipping 20 matching lines...) Expand all
385 class EpollSelector(_BaseSelectorImpl): 388 class EpollSelector(_BaseSelectorImpl):
386 """Epoll-based selector.""" 389 """Epoll-based selector."""
387 390
388 def __init__(self): 391 def __init__(self):
389 super().__init__() 392 super().__init__()
390 self._epoll = select.epoll() 393 self._epoll = select.epoll()
391 394
392 def fileno(self): 395 def fileno(self):
393 return self._epoll.fileno() 396 return self._epoll.fileno()
394 397
395 def register(self, fileobj, events, data=None): 398 def _register(self, key):
396 key = super().register(fileobj, events, data)
397 epoll_events = 0 399 epoll_events = 0
398 if events & EVENT_READ: 400 if key.events & EVENT_READ:
399 epoll_events |= select.EPOLLIN 401 epoll_events |= select.EPOLLIN
400 if events & EVENT_WRITE: 402 if key.events & EVENT_WRITE:
401 epoll_events |= select.EPOLLOUT 403 epoll_events |= select.EPOLLOUT
402 self._epoll.register(key.fd, epoll_events) 404 self._epoll.register(key.fd, epoll_events)
403 return key
404 405
405 def unregister(self, fileobj): 406 def _unregister(self, key):
406 key = super().unregister(fileobj)
407 try: 407 try:
408 self._epoll.unregister(key.fd) 408 self._epoll.unregister(key.fd)
409 except OSError: 409 except OSError:
410 # This can happen if the FD was closed since it 410 # This can happen if the FD was closed since it
411 # was registered. 411 # was registered.
412 pass 412 pass
413 return key 413
414 def _at_fork(self):
415 # don't unregister file descriptors: epoll is still shared with
416 # the parent process
417 self._epoll = select.epoll()
418 for key in self._fd_to_key.values():
419 self._register(key)
414 420
415 def select(self, timeout=None): 421 def select(self, timeout=None):
416 if timeout is None: 422 if timeout is None:
417 timeout = -1 423 timeout = -1
418 elif timeout <= 0: 424 elif timeout <= 0:
419 timeout = 0 425 timeout = 0
420 else: 426 else:
421 # epoll_wait() has a resolution of 1 millisecond, round away 427 # epoll_wait() has a resolution of 1 millisecond, round away
422 # from zero to wait *at least* timeout seconds. 428 # from zero to wait *at least* timeout seconds.
423 timeout = math.ceil(timeout * 1e3) * 1e-3 429 timeout = math.ceil(timeout * 1e3) * 1e-3
(...skipping 30 matching lines...) Expand all
454 class DevpollSelector(_BaseSelectorImpl): 460 class DevpollSelector(_BaseSelectorImpl):
455 """Solaris /dev/poll selector.""" 461 """Solaris /dev/poll selector."""
456 462
457 def __init__(self): 463 def __init__(self):
458 super().__init__() 464 super().__init__()
459 self._devpoll = select.devpoll() 465 self._devpoll = select.devpoll()
460 466
461 def fileno(self): 467 def fileno(self):
462 return self._devpoll.fileno() 468 return self._devpoll.fileno()
463 469
464 def register(self, fileobj, events, data=None): 470 def _register(self, key):
465 key = super().register(fileobj, events, data)
466 poll_events = 0 471 poll_events = 0
467 if events & EVENT_READ: 472 if key.events & EVENT_READ:
468 poll_events |= select.POLLIN 473 poll_events |= select.POLLIN
469 if events & EVENT_WRITE: 474 if key.events & EVENT_WRITE:
470 poll_events |= select.POLLOUT 475 poll_events |= select.POLLOUT
471 self._devpoll.register(key.fd, poll_events) 476 self._devpoll.register(key.fd, poll_events)
472 return key
473 477
474 def unregister(self, fileobj): 478 def _unregister(self, key):
475 key = super().unregister(fileobj)
476 self._devpoll.unregister(key.fd) 479 self._devpoll.unregister(key.fd)
477 return key 480
481 def _at_fork(self):
482 # don't unregister file descriptors: devpoll is still shared with
483 # the parent process
484 self._devpoll = select.devpoll()
485 for key in self._fd_to_key.values():
486 self._register(key)
478 487
479 def select(self, timeout=None): 488 def select(self, timeout=None):
480 if timeout is None: 489 if timeout is None:
481 timeout = None 490 timeout = None
482 elif timeout <= 0: 491 elif timeout <= 0:
483 timeout = 0 492 timeout = 0
484 else: 493 else:
485 # devpoll() has a resolution of 1 millisecond, round away from 494 # devpoll() has a resolution of 1 millisecond, round away from
486 # zero to wait *at least* timeout seconds. 495 # zero to wait *at least* timeout seconds.
487 timeout = math.ceil(timeout * 1e3) 496 timeout = math.ceil(timeout * 1e3)
(...skipping 24 matching lines...) Expand all
512 class KqueueSelector(_BaseSelectorImpl): 521 class KqueueSelector(_BaseSelectorImpl):
513 """Kqueue-based selector.""" 522 """Kqueue-based selector."""
514 523
515 def __init__(self): 524 def __init__(self):
516 super().__init__() 525 super().__init__()
517 self._kqueue = select.kqueue() 526 self._kqueue = select.kqueue()
518 527
519 def fileno(self): 528 def fileno(self):
520 return self._kqueue.fileno() 529 return self._kqueue.fileno()
521 530
522 def register(self, fileobj, events, data=None): 531 def _register(self, key):
523 key = super().register(fileobj, events, data) 532 if key.events & EVENT_READ:
524 if events & EVENT_READ:
525 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 533 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
526 select.KQ_EV_ADD) 534 select.KQ_EV_ADD)
527 self._kqueue.control([kev], 0, 0) 535 self._kqueue.control([kev], 0, 0)
528 if events & EVENT_WRITE: 536 if key.events & EVENT_WRITE:
529 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 537 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
530 select.KQ_EV_ADD) 538 select.KQ_EV_ADD)
531 self._kqueue.control([kev], 0, 0) 539 self._kqueue.control([kev], 0, 0)
532 return key
533 540
534 def unregister(self, fileobj): 541 def _unregister(self, key):
535 key = super().unregister(fileobj)
536 if key.events & EVENT_READ: 542 if key.events & EVENT_READ:
537 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 543 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
538 select.KQ_EV_DELETE) 544 select.KQ_EV_DELETE)
539 try: 545 try:
540 self._kqueue.control([kev], 0, 0) 546 self._kqueue.control([kev], 0, 0)
541 except OSError: 547 except OSError:
542 # This can happen if the FD was closed since it 548 # This can happen if the FD was closed since it
543 # was registered. 549 # was registered.
544 pass 550 pass
545 if key.events & EVENT_WRITE: 551 if key.events & EVENT_WRITE:
546 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 552 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
547 select.KQ_EV_DELETE) 553 select.KQ_EV_DELETE)
548 try: 554 try:
549 self._kqueue.control([kev], 0, 0) 555 self._kqueue.control([kev], 0, 0)
550 except OSError: 556 except OSError:
551 # See comment above. 557 # See comment above.
552 pass 558 pass
553 return key 559
560 def _at_fork(self):
561 # don't unregister file descriptors: kqueue is still shared with
562 # the parent process
563 self._kqueue = select.kqueue()
564 for key in self._fd_to_key.values():
565 self._register(key)
554 566
555 def select(self, timeout=None): 567 def select(self, timeout=None):
556 timeout = None if timeout is None else max(timeout, 0) 568 timeout = None if timeout is None else max(timeout, 0)
557 max_ev = len(self._fd_to_key) 569 max_ev = len(self._fd_to_key)
558 ready = [] 570 ready = []
559 try: 571 try:
560 kev_list = self._kqueue.control(None, max_ev, timeout) 572 kev_list = self._kqueue.control(None, max_ev, timeout)
561 except InterruptedError: 573 except InterruptedError:
562 return ready 574 return ready
563 for kev in kev_list: 575 for kev in kev_list:
(...skipping 21 matching lines...) Expand all
585 if 'KqueueSelector' in globals(): 597 if 'KqueueSelector' in globals():
586 DefaultSelector = KqueueSelector 598 DefaultSelector = KqueueSelector
587 elif 'EpollSelector' in globals(): 599 elif 'EpollSelector' in globals():
588 DefaultSelector = EpollSelector 600 DefaultSelector = EpollSelector
589 elif 'DevpollSelector' in globals(): 601 elif 'DevpollSelector' in globals():
590 DefaultSelector = DevpollSelector 602 DefaultSelector = DevpollSelector
591 elif 'PollSelector' in globals(): 603 elif 'PollSelector' in globals():
592 DefaultSelector = PollSelector 604 DefaultSelector = PollSelector
593 else: 605 else:
594 DefaultSelector = SelectSelector 606 DefaultSelector = SelectSelector
OLDNEW
« no previous file with comments | « Lib/asyncio/unix_events.py ('k') | no next file » | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+