Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(81635)

Side by Side Diff: Lib/subprocess.py

Issue 18923: Use the new selectors module in the subprocess module
Patch Set: Created 5 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | Lib/test/test_subprocess.py » ('j') | Lib/test/test_subprocess.py » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # subprocess - Subprocesses with accessible I/O streams 1 # subprocess - Subprocesses with accessible I/O streams
2 # 2 #
3 # For more information about this module, see PEP 324. 3 # For more information about this module, see PEP 324.
4 # 4 #
5 # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> 5 # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6 # 6 #
7 # Licensed to PSF under a Contributor Agreement. 7 # Licensed to PSF under a Contributor Agreement.
8 # See http://www.python.org/2.4/license for licensing details. 8 # See http://www.python.org/2.4/license for licensing details.
9 9
10 r"""subprocess - Subprocesses with accessible I/O streams 10 r"""subprocess - Subprocesses with accessible I/O streams
(...skipping 385 matching lines...) Expand 10 before | Expand all | Expand 10 after
396 import msvcrt 396 import msvcrt
397 import _winapi 397 import _winapi
398 class STARTUPINFO: 398 class STARTUPINFO:
399 dwFlags = 0 399 dwFlags = 0
400 hStdInput = None 400 hStdInput = None
401 hStdOutput = None 401 hStdOutput = None
402 hStdError = None 402 hStdError = None
403 wShowWindow = 0 403 wShowWindow = 0
404 else: 404 else:
405 import select 405 import select
406 _has_poll = hasattr(select, 'poll') 406 import selectors
407 import _posixsubprocess 407 import _posixsubprocess
408 408
409 # When select or poll has indicated that the file is writable, 409 # When select or poll has indicated that the file is writable,
410 # we can write up to _PIPE_BUF bytes without risk of blocking. 410 # we can write up to _PIPE_BUF bytes without risk of blocking.
411 # POSIX defines PIPE_BUF as >= 512. 411 # POSIX defines PIPE_BUF as >= 512.
412 _PIPE_BUF = getattr(select, 'PIPE_BUF', 512) 412 _PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
413
414 # poll/select have the advantage of not requiring any extra file
415 # descriptor, contrarily to epoll/kqueue (also, they require a single
416 # syscall).
417 if hasattr(selectors, 'PollSelector'):
418 _PopenSelector = selectors.PollSelector
419 else:
420 _PopenSelector = selectors.SelectSelector
haypo 2013/10/31 23:56:12 This code should be factorized, but I opened a new
413 421
414 422
415 __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", 423 __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
416 "getoutput", "check_output", "CalledProcessError", "DEVNULL"] 424 "getoutput", "check_output", "CalledProcessError", "DEVNULL"]
417 425
418 if mswindows: 426 if mswindows:
419 from _winapi import (CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP, 427 from _winapi import (CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP,
420 STD_INPUT_HANDLE, STD_OUTPUT_HANDLE, 428 STD_INPUT_HANDLE, STD_OUTPUT_HANDLE,
421 STD_ERROR_HANDLE, SW_HIDE, 429 STD_ERROR_HANDLE, SW_HIDE,
422 STARTF_USESTDHANDLES, STARTF_USESHOWWINDOW) 430 STARTF_USESTDHANDLES, STARTF_USESHOWWINDOW)
(...skipping 1102 matching lines...) Expand 10 before | Expand all | Expand 10 after
1525 1533
1526 1534
1527 def _communicate(self, input, endtime, orig_timeout): 1535 def _communicate(self, input, endtime, orig_timeout):
1528 if self.stdin and not self._communication_started: 1536 if self.stdin and not self._communication_started:
1529 # Flush stdio buffer. This might block, if the user has 1537 # Flush stdio buffer. This might block, if the user has
1530 # been writing to .stdin in an uncontrolled fashion. 1538 # been writing to .stdin in an uncontrolled fashion.
1531 self.stdin.flush() 1539 self.stdin.flush()
1532 if not input: 1540 if not input:
1533 self.stdin.close() 1541 self.stdin.close()
1534 1542
1535 if _has_poll: 1543 stdout = None
1536 stdout, stderr = self._communicate_with_poll(input, endtime, 1544 stderr = None
1537 orig_timeout) 1545
1538 else: 1546 # Only create this mapping if we haven't already.
1539 stdout, stderr = self._communicate_with_select(input, endtime, 1547 if not self._communication_started:
1540 orig_timeout) 1548 self._fileobj2output = {}
1549 if self.stdout:
1550 self._fileobj2output[self.stdout] = []
1551 if self.stderr:
1552 self._fileobj2output[self.stderr] = []
1553
1554 if self.stdout:
1555 stdout = self._fileobj2output[self.stdout]
1556 if self.stderr:
1557 stderr = self._fileobj2output[self.stderr]
1558
1559 self._save_input(input)
1560
1561 with _PopenSelector() as selector:
1562 if self.stdin and input:
1563 selector.register(self.stdin, selectors.EVENT_WRITE)
1564 if self.stdout:
1565 selector.register(self.stdout, selectors.EVENT_READ)
1566 if self.stderr:
1567 selector.register(self.stderr, selectors.EVENT_READ)
1568
1569 while selector.get_map():
1570 timeout = self._remaining_time(endtime)
1571 if timeout is not None and timeout < 0:
1572 raise TimeoutExpired(self.args, orig_timeout)
1573
1574 ready = selector.select(timeout)
1575 self._check_timeout(endtime, orig_timeout)
1576
1577 # XXX Rewrite these to use non-blocking I/O on the file
1578 # objects; they are no longer using C stdio!
1579
1580 for key, events in ready:
1581 if events & selectors.EVENT_WRITE:
1582 chunk = self._input[self._input_offset :
1583 self._input_offset + _PIPE_BUF]
haypo 2013/10/31 23:56:12 (Unrelated to the selector issue) It may be more e
1584 try:
1585 self._input_offset += os.write(key.fd, chunk)
1586 except OSError as e:
1587 if e.errno == errno.EPIPE:
1588 selector.unregister(key.fileobj)
1589 key.fileobj.close()
1590 else:
1591 raise
1592 else:
1593 if self._input_offset >= len(self._input):
1594 selector.unregister(key.fileobj)
1595 key.fileobj.close()
1596 elif events & selectors.EVENT_READ:
1597 data = os.read(key.fd, 4096)
haypo 2013/10/31 23:56:12 In the old code, poll() uses 4096 whereas select()
1598 if not data:
1599 selector.unregister(key.fileobj)
1600 key.fileobj.close()
1601 self._fileobj2output[key.fileobj].append(data)
1541 1602
1542 self.wait(timeout=self._remaining_time(endtime)) 1603 self.wait(timeout=self._remaining_time(endtime))
1543 1604
1544 # All data exchanged. Translate lists into strings. 1605 # All data exchanged. Translate lists into strings.
1545 if stdout is not None: 1606 if stdout is not None:
1546 stdout = b''.join(stdout) 1607 stdout = b''.join(stdout)
1547 if stderr is not None: 1608 if stderr is not None:
1548 stderr = b''.join(stderr) 1609 stderr = b''.join(stderr)
1549 1610
1550 # Translate newlines, if requested. 1611 # Translate newlines, if requested.
(...skipping 11 matching lines...) Expand all
1562 1623
1563 def _save_input(self, input): 1624 def _save_input(self, input):
1564 # This method is called from the _communicate_with_*() methods 1625 # This method is called from the _communicate_with_*() methods
1565 # so that if we time out while communicating, we can continue 1626 # so that if we time out while communicating, we can continue
1566 # sending input if we retry. 1627 # sending input if we retry.
1567 if self.stdin and self._input is None: 1628 if self.stdin and self._input is None:
1568 self._input_offset = 0 1629 self._input_offset = 0
1569 self._input = input 1630 self._input = input
1570 if self.universal_newlines and input is not None: 1631 if self.universal_newlines and input is not None:
1571 self._input = self._input.encode(self.stdin.encoding) 1632 self._input = self._input.encode(self.stdin.encoding)
1572
1573
1574 def _communicate_with_poll(self, input, endtime, orig_timeout):
1575 stdout = None # Return
1576 stderr = None # Return
1577
1578 if not self._communication_started:
1579 self._fd2file = {}
1580
1581 poller = select.poll()
1582 def register_and_append(file_obj, eventmask):
1583 poller.register(file_obj.fileno(), eventmask)
1584 self._fd2file[file_obj.fileno()] = file_obj
1585
1586 def close_unregister_and_remove(fd):
1587 poller.unregister(fd)
1588 self._fd2file[fd].close()
1589 self._fd2file.pop(fd)
1590
1591 if self.stdin and input:
1592 register_and_append(self.stdin, select.POLLOUT)
1593
1594 # Only create this mapping if we haven't already.
1595 if not self._communication_started:
1596 self._fd2output = {}
1597 if self.stdout:
1598 self._fd2output[self.stdout.fileno()] = []
1599 if self.stderr:
1600 self._fd2output[self.stderr.fileno()] = []
1601
1602 select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
1603 if self.stdout:
1604 register_and_append(self.stdout, select_POLLIN_POLLPRI)
1605 stdout = self._fd2output[self.stdout.fileno()]
1606 if self.stderr:
1607 register_and_append(self.stderr, select_POLLIN_POLLPRI)
1608 stderr = self._fd2output[self.stderr.fileno()]
1609
1610 self._save_input(input)
1611
1612 while self._fd2file:
1613 timeout = self._remaining_time(endtime)
1614 if timeout is not None and timeout < 0:
1615 raise TimeoutExpired(self.args, orig_timeout)
1616 try:
1617 ready = poller.poll(timeout)
1618 except OSError as e:
1619 if e.args[0] == errno.EINTR:
1620 continue
1621 raise
1622 self._check_timeout(endtime, orig_timeout)
1623
1624 # XXX Rewrite these to use non-blocking I/O on the
1625 # file objects; they are no longer using C stdio!
1626
1627 for fd, mode in ready:
1628 if mode & select.POLLOUT:
1629 chunk = self._input[self._input_offset :
1630 self._input_offset + _PIPE_BUF]
1631 try:
1632 self._input_offset += os.write(fd, chunk)
1633 except OSError as e:
1634 if e.errno == errno.EPIPE:
1635 close_unregister_and_remove(fd)
1636 else:
1637 raise
1638 else:
1639 if self._input_offset >= len(self._input):
1640 close_unregister_and_remove(fd)
1641 elif mode & select_POLLIN_POLLPRI:
1642 data = os.read(fd, 4096)
1643 if not data:
1644 close_unregister_and_remove(fd)
1645 self._fd2output[fd].append(data)
1646 else:
1647 # Ignore hang up or errors.
1648 close_unregister_and_remove(fd)
1649
1650 return (stdout, stderr)
1651
1652
1653 def _communicate_with_select(self, input, endtime, orig_timeout):
1654 if not self._communication_started:
1655 self._read_set = []
1656 self._write_set = []
1657 if self.stdin and input:
1658 self._write_set.append(self.stdin)
1659 if self.stdout:
1660 self._read_set.append(self.stdout)
1661 if self.stderr:
1662 self._read_set.append(self.stderr)
1663
1664 self._save_input(input)
1665
1666 stdout = None # Return
1667 stderr = None # Return
1668
1669 if self.stdout:
1670 if not self._communication_started:
1671 self._stdout_buff = []
1672 stdout = self._stdout_buff
1673 if self.stderr:
1674 if not self._communication_started:
1675 self._stderr_buff = []
1676 stderr = self._stderr_buff
1677
1678 while self._read_set or self._write_set:
1679 timeout = self._remaining_time(endtime)
1680 if timeout is not None and timeout < 0:
1681 raise TimeoutExpired(self.args, orig_timeout)
1682 try:
1683 (rlist, wlist, xlist) = \
1684 select.select(self._read_set, self._write_set, [],
1685 timeout)
1686 except OSError as e:
1687 if e.args[0] == errno.EINTR:
1688 continue
1689 raise
1690
1691 # According to the docs, returning three empty lists indicates
1692 # that the timeout expired.
1693 if not (rlist or wlist or xlist):
1694 raise TimeoutExpired(self.args, orig_timeout)
1695 # We also check what time it is ourselves for good measure.
1696 self._check_timeout(endtime, orig_timeout)
1697
1698 # XXX Rewrite these to use non-blocking I/O on the
1699 # file objects; they are no longer using C stdio!
1700
1701 if self.stdin in wlist:
1702 chunk = self._input[self._input_offset :
1703 self._input_offset + _PIPE_BUF]
1704 try:
1705 bytes_written = os.write(self.stdin.fileno(), chunk)
1706 except OSError as e:
1707 if e.errno == errno.EPIPE:
1708 self.stdin.close()
1709 self._write_set.remove(self.stdin)
1710 else:
1711 raise
1712 else:
1713 self._input_offset += bytes_written
1714 if self._input_offset >= len(self._input):
1715 self.stdin.close()
1716 self._write_set.remove(self.stdin)
1717
1718 if self.stdout in rlist:
1719 data = os.read(self.stdout.fileno(), 1024)
1720 if not data:
1721 self.stdout.close()
1722 self._read_set.remove(self.stdout)
1723 stdout.append(data)
1724
1725 if self.stderr in rlist:
1726 data = os.read(self.stderr.fileno(), 1024)
1727 if not data:
1728 self.stderr.close()
1729 self._read_set.remove(self.stderr)
1730 stderr.append(data)
1731
1732 return (stdout, stderr)
1733 1633
1734 1634
1735 def send_signal(self, sig): 1635 def send_signal(self, sig):
1736 """Send a signal to the process 1636 """Send a signal to the process
1737 """ 1637 """
1738 os.kill(self.pid, sig) 1638 os.kill(self.pid, sig)
1739 1639
1740 def terminate(self): 1640 def terminate(self):
1741 """Terminate the process with SIGTERM 1641 """Terminate the process with SIGTERM
1742 """ 1642 """
1743 self.send_signal(signal.SIGTERM) 1643 self.send_signal(signal.SIGTERM)
1744 1644
1745 def kill(self): 1645 def kill(self):
1746 """Kill the process with SIGKILL 1646 """Kill the process with SIGKILL
1747 """ 1647 """
1748 self.send_signal(signal.SIGKILL) 1648 self.send_signal(signal.SIGKILL)
OLDNEW
« no previous file with comments | « no previous file | Lib/test/test_subprocess.py » ('j') | Lib/test/test_subprocess.py » ('J')

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+