diff --git a/Lib/shutil.py b/Lib/shutil.py --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -12,6 +12,11 @@ import collections import errno import tarfile +import subprocess +import shlex +import string +import itertools +import re try: import bz2 @@ -35,7 +40,9 @@ "register_archive_format", "unregister_archive_format", "get_unpack_formats", "register_unpack_format", "unregister_unpack_format", "unpack_archive", - "ignore_patterns", "chown"] + "ignore_patterns", "chown", + "quote_ascii_whitespace", "shell_format", "shell_format_map", + "shell_call", "check_shell_call", "shell_output"] # disk_usage is added later, if available on the platform class Error(EnvironmentError): @@ -822,3 +829,113 @@ raise LookupError("no such group: {!r}".format(group)) os.chown(path, _user, _group) + + +# Shell invocation helpers +def quote_ascii_whitespace(text): + """Quote each group of ASCII whitespace characters with double quotes""" + return re.sub("([ \t\n\r\f\v]+)", r'"\1"', text) + +class _ShellFormatter(string.Formatter): + """Formatter to automatically escape whitespace in interpolated values + + It is not thread safe due to the way it handles auto field numbering + """ + + def vformat(self, fmt, args, kwds): + self._autoincrement = itertools.count() + return super().vformat(fmt, args, kwds) + + def get_value(self, key, args, kwds): + if key == '': + key = next(self._autoincrement) + return super().get_value(key, args, kwds) + + def convert_field(self, value, conversion): + if conversion == 'u': + return value + elif conversion == 'q': + return shlex.quote(str(value)) + elif conversion is None: + return quote_ascii_whitespace(str(value)) + return super().convert_field(value, conversion) + + +def shell_format(_fmt, *args, **kwds): + """A str.format variant tailored for shell command interpolation. + + All interpolated values are coerced to strings by default and any + ASCII whitespace in the result is automatically quoted. + The conversion specifier "!q" also coerces the value to a string, but + uses shlex.quote() to quote the entire interpolated field. + The conversion specifier "!u" bypasses the default whitespace quoting + and directly interpolates the unquoted value. + The conversion specifiers "!r" and "!s" have their usual effects and + also bypass the implicit quoting mechanism. + + As brace characters ('{' and '}') in the format string are used to + indicate interpolated fields, they must either be included in an + interpolated value or else doubled (i.e. '{{' and '}}') in the format + string in order to be passed to the underlying shell. + """ + return _ShellFormatter().vformat(_fmt, args, kwds) + +def shell_format_map(_fmt, mapping): + """A str.format_map variant tailored for shell command interpolation. + + All interpolated values are coerced to strings by default and any + ASCII whitespace in the result is automatically quoted. + The conversion specifier "!q" also coerces the value to a string, but + uses shlex.quote() to quote the entire interpolated field. + The conversion specifier "!u" bypasses the default whitespace quoting + and directly interpolates the unquoted value. + The conversion specifiers "!r" and "!s" have their usual effects and + also bypass the implicit quoting mechanism. + + As brace characters ('{' and '}') in the format string are used to + indicate interpolated fields, they must either be included in an + interpolated value or else doubled (i.e. '{{' and '}}') in the format + string in order to be passed to the underlying shell. + """ + return _ShellFormatter().vformat(fmt, (), mapping) + + +def shell_call(cmd, *args, **kwds): + """Invoke subprocess.call() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + return subprocess.call(cmd, shell=True) + +def check_shell_call(cmd, *args, **kwds): + """Invoke subprocess.check_call() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + return subprocess.check_call(cmd, shell=True) + +def shell_output(cmd, *args, **kwds): + """Invoke subprocess.check_output() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + + Use shell redirection (2>&1) to capture stderr in addition to stdout + A trailing newline (if any) will be removed from the result + This call enables Popen's universal newlines for the command result + Accordingly, it only works with UTF-8 encoded data. For other encodings + use the subprocess APIs directly. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + data = subprocess.check_output(cmd, shell=True, universal_newlines=True) + if data[-1:] == "\n": + data = data[:-1] + return data + diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -8,6 +8,7 @@ import os import os.path import functools +import subprocess from test import support from test.support import TESTFN from os.path import splitdrive @@ -16,7 +17,9 @@ register_archive_format, unregister_archive_format, get_archive_formats, Error, unpack_archive, register_unpack_format, RegistryError, - unregister_unpack_format, get_unpack_formats) + unregister_unpack_format, get_unpack_formats, + quote_ascii_whitespace, shell_format, shell_format_map, + shell_call, check_shell_call, shell_output) import tarfile import warnings @@ -1004,10 +1007,70 @@ finally: os.rmdir(dst_dir) +class TestShellHelpers(unittest.TestCase): + + SAFE_SYMBOLS = "@%+=:,./-" + DODGY_SYMBOLS = r"""\*<>|;&$!'"{}""" + + def test_quote_ascii_whitespace(self): + symbols = self.SAFE_SYMBOLS + self.DODGY_SYMBOLS + unquoted = ' text with much \twhitespace' + symbols + quoted = '" "text" "with" "much" \t"whitespace' + symbols + self.assertEqual(quote_ascii_whitespace(unquoted), quoted) + + def test_shell_format(self): + unquoted_symbols = self.SAFE_SYMBOLS + self.DODGY_SYMBOLS + quoted_symbols = self.SAFE_SYMBOLS + r"""\*<>|;&$!'"'"'"{}""" + unquoted_text = ' text with much \twhitespace' + quoted_text = '" "text" "with" "much" \t"whitespace' + unquoted = unquoted_text + unquoted_symbols + expected = quoted_text + unquoted_symbols + self.assertEqual(shell_format("{}", unquoted), expected) + expected = "'" + unquoted_text + quoted_symbols + "'" + self.assertEqual(shell_format("{!q}", unquoted), expected) + expected = unquoted + self.assertEqual(shell_format("{!u}", unquoted), expected) + self.assertEqual(shell_format("{!s}", unquoted), expected) + expected = repr(unquoted) + self.assertEqual(shell_format("{!r}", unquoted), expected) + + def test_shell_call(self): + self.assertEqual(shell_call("exit {}", 0), 0) + self.assertEqual(shell_call("exit {}", 1), 1) + + def test_check_shell_call(self): + self.assertEqual(shell_call("exit {}", 0), 0) + with self.assertRaises(subprocess.CalledProcessError) as err: + check_shell_call("exit {}", 1) + self.assertEqual(err.exception.returncode, 1) + + def test_shell_output(self): + text = "Hello world!" + self.assertEqual(shell_output("echo {}", text), text) + with self.assertRaises(subprocess.CalledProcessError) as err: + shell_output("echo {}; exit {}", text, 1) + exc = err.exception + self.assertEqual(exc.returncode, 1) + self.assertEqual(exc.output.strip(), text) + + def test_implicit_whitespace_escaping(self): + text = "Hello world!" + with tempfile.TemporaryDirectory() as d: + fname = os.path.join(d, " name with much \twhitespace.txt") + with open(fname, 'w') as f: + f.write(text) + self.assertEqual(shell_call("cat {}", fname), 0) + self.assertEqual(check_shell_call("cat {}", fname), 0) + self.assertEqual(shell_output("cat {}", fname), text) + self.assertEqual(shell_call("cat {!q}", fname), 0) + self.assertEqual(check_shell_call("cat {!q}", fname), 0) + self.assertEqual(shell_output("cat {!q}", fname), text) + + def test_main(): - support.run_unittest(TestShutil, TestMove, TestCopyFile) + support.run_unittest(TestShutil, TestMove, TestCopyFile, TestShellHelpers) if __name__ == '__main__': test_main()