diff -r 101919aba8d6 -r b267e72c8c10 Lib/shutil.py --- a/Lib/shutil.py Mon Oct 31 09:05:10 2011 +0100 +++ b/Lib/shutil.py Mon Oct 31 18:26:45 2011 +1000 @@ -12,6 +12,11 @@ import collections import errno import tarfile +import subprocess +import shlex +import string +import itertools +import re try: import bz2 @@ -35,7 +40,9 @@ "register_archive_format", "unregister_archive_format", "get_unpack_formats", "register_unpack_format", "unregister_unpack_format", "unpack_archive", - "ignore_patterns", "chown"] + "ignore_patterns", "chown", + "shell_format", "shell_format_map", + "shell_call", "check_shell_call", "shell_output"] # disk_usage is added later, if available on the platform class Error(EnvironmentError): @@ -822,3 +829,103 @@ raise LookupError("no such group: {!r}".format(group)) os.chown(path, _user, _group) + + +# Shell invocation helpers +class _ShellFormatter(string.Formatter): + """Formatter to automatically escape interpolated values + + It is not thread safe due to the way it handles auto field numbering + """ + + def vformat(self, fmt, args, kwds): + self._autoincrement = itertools.count() + return super().vformat(fmt, args, kwds) + + def get_value(self, key, args, kwds): + if key == '': + key = next(self._autoincrement) + return super().get_value(key, args, kwds) + + def convert_field(self, value, conversion): + if conversion == 'u': + return value + elif conversion is None: + return shlex.quote(str(value)) + return super().convert_field(value, conversion) + + +def shell_format(_fmt, *args, **kwds): + """A str.format variant tailored for shell command interpolation. + + All interpolated values are coerced to strings by default and quoted + with shlex.quote(). + The conversion specifier "!u" bypasses this quoting process and + instead performs normal interpolation of the unquoted value. + The conversion specifiers "!r" and "!s" have their usual effects and + also bypass the implicit quoting mechanism. + + As brace characters ('{' and '}') in the format string are used to + indicate interpolated fields, they must either be included in an + interpolated value or else doubled (i.e. '{{' and '}}') in the format + string in order to be passed to the underlying shell. + """ + return _ShellFormatter().vformat(_fmt, args, kwds) + +def shell_format_map(_fmt, mapping): + """A str.format_map variant tailored for shell command interpolation. + + All interpolated values are coerced to strings by default and quoted + with shlex.quote(). + The conversion specifier "!u" bypasses this quoting process and + instead performs normal interpolation of the unquoted value. + The conversion specifiers "!r" and "!s" have their usual effects and + also bypass the implicit quoting mechanism. + + As brace characters ('{' and '}') in the format string are used to + indicate interpolated fields, they must either be included in an + interpolated value or else doubled (i.e. '{{' and '}}') in the format + string in order to be passed to the underlying shell. + """ + return _ShellFormatter().vformat(fmt, (), mapping) + + +def shell_call(cmd, *args, **kwds): + """Invoke subprocess.call() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + return subprocess.call(cmd, shell=True) + +def check_shell_call(cmd, *args, **kwds): + """Invoke subprocess.check_call() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + return subprocess.check_call(cmd, shell=True) + +def shell_output(cmd, *args, **kwds): + """Invoke subprocess.check_output() with shell=True and interpolated arguments + + Allows wildcards and other shell metacharacters in interpolated strings + but escapes whitespace so paths with spaces are handled automatially. + + Use shell redirection (2>&1) to capture stderr in addition to stdout + A trailing newline (if any) will be removed from the result + This call enables Popen's universal newlines for the command result + Accordingly, it only works with UTF-8 encoded data. For other encodings + use the subprocess APIs directly. + """ + if args or kwds: + cmd = _ShellFormatter().vformat(cmd, args, kwds) + data = subprocess.check_output(cmd, shell=True, universal_newlines=True) + if data[-1:] == "\n": + data = data[:-1] + return data + diff -r 101919aba8d6 -r b267e72c8c10 Lib/test/test_shutil.py --- a/Lib/test/test_shutil.py Mon Oct 31 09:05:10 2011 +0100 +++ b/Lib/test/test_shutil.py Mon Oct 31 18:26:45 2011 +1000 @@ -8,6 +8,7 @@ import os import os.path import functools +import subprocess from test import support from test.support import TESTFN from os.path import splitdrive @@ -16,7 +17,9 @@ register_archive_format, unregister_archive_format, get_archive_formats, Error, unpack_archive, register_unpack_format, RegistryError, - unregister_unpack_format, get_unpack_formats) + unregister_unpack_format, get_unpack_formats, + shell_format, shell_format_map, + shell_call, check_shell_call, shell_output) import tarfile import warnings @@ -1004,10 +1007,90 @@ finally: os.rmdir(dst_dir) +class TestShellHelpers(unittest.TestCase): + + SAFE_SYMBOLS = "@%+=:,./-" + DODGY_SYMBOLS = r"""\*<>|;&$!'"{}""" + + def test_shell_format(self): + unquoted_symbols = self.SAFE_SYMBOLS + self.DODGY_SYMBOLS + quoted_symbols = self.SAFE_SYMBOLS + r"""\*<>|;&$!'"'"'"{}""" + unquoted_text = ' text with much \twhitespace' + unquoted = unquoted_text + unquoted_symbols + expected = "'" + unquoted_text + quoted_symbols + "'" + self.assertEqual(shell_format("{}", unquoted), expected) + expected = unquoted + self.assertEqual(shell_format("{!u}", unquoted), expected) + self.assertEqual(shell_format("{!s}", unquoted), expected) + expected = repr(unquoted) + self.assertEqual(shell_format("{!r}", unquoted), expected) + + def test_shell_call(self): + self.assertEqual(shell_call("exit {}", 0), 0) + self.assertEqual(shell_call("exit {}", 1), 1) + + def test_check_shell_call(self): + self.assertEqual(shell_call("exit {}", 0), 0) + with self.assertRaises(subprocess.CalledProcessError) as err: + check_shell_call("exit {}", 1) + self.assertEqual(err.exception.returncode, 1) + + def test_shell_output(self): + text = "Hello world!" + self.assertEqual(shell_output("echo {}", text), text) + with self.assertRaises(subprocess.CalledProcessError) as err: + shell_output("echo {}; exit {}", text, 1) + exc = err.exception + self.assertEqual(exc.returncode, 1) + self.assertEqual(exc.output.strip(), text) + + @unittest.skipIf(sys.platform.startswith('win'), + "test relies on 'cat' and 'ls'") + def test_implicit_quoting(self): + # TODO: Add Windows variants for the test commands + text = "Hello world!" + with tempfile.TemporaryDirectory() as d: + # Setup test file + fname = os.path.join(d, " name with much \twhitespace.txt") + fname_pattern = os.path.join(d, "name*") + with open(fname, 'w') as f: + f.write(text) + + cat_cmd = "cat {} > /dev/null" + self.assertEqual(shell_call(cat_cmd, fname), 0) + self.assertEqual(check_shell_call(cat_cmd, fname), 0) + cat_cmd = "cat {}" + self.assertEqual(shell_output(cat_cmd, fname), text) + ls_cmd = "ls {} 2>&1" + with self.assertRaises(subprocess.CalledProcessError) as err: + shell_output(ls_cmd, fname_pattern) + + @unittest.skipIf(sys.platform.startswith('win'), + "test relies on 'cat' and 'ls'") + def test_implicit_quoting_bypass(self): + # TODO: Add Windows variants for the test commands + text = "Hello world!" + with tempfile.TemporaryDirectory() as d: + # Setup test file + fname = os.path.join(d, " name with much \twhitespace.txt") + fname_pattern = os.path.join(d, "?name*") + with open(fname, 'w') as f: + f.write(text) + + cat_cmd = "cat {!u} > /dev/null 2>&1" + self.assertNotEqual(shell_call(cat_cmd, fname), 0) + with self.assertRaises(subprocess.CalledProcessError) as err: + check_shell_call(cat_cmd, fname) + cat_cmd = "cat {!u} 2>&1" + with self.assertRaises(subprocess.CalledProcessError) as err: + shell_output(cat_cmd, fname) + ls_cmd = "ls {!u}" + self.assertEqual(shell_output(ls_cmd, fname_pattern), fname) + def test_main(): - support.run_unittest(TestShutil, TestMove, TestCopyFile) + support.run_unittest(TestShutil, TestMove, TestCopyFile, TestShellHelpers) if __name__ == '__main__': test_main()