# HG changeset patch # User Antoine Pitrou # Date 1236555046 -3600 diff -r 3698dc8dc0c1 -r ad2d93474a1b Lib/shutil.py --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -11,11 +11,15 @@ from os.path import abspath import fnmatch __all__ = ["copyfileobj","copyfile","copymode","copystat","copy","copy2", - "copytree","move","rmtree","Error"] + "copytree","move","rmtree","Error", "SpecialFileError"] class Error(EnvironmentError): pass +class SpecialFileError(EnvironmentError): + """Raised when trying to do a kind of operation (e.g. copying) which is + not supported on a special file (e.g. a named pipe)""" + try: WindowsError except NameError: @@ -48,6 +52,15 @@ def copyfile(src, dst): fsrc = None fdst = None + for fn in [src, dst]: + try: + st = os.stat(fn) + except OSError: + # File most likely does not exist + pass + # XXX What about other special files? (sockets, devices...) + if stat.S_ISFIFO(st.st_mode): + raise SpecialFileError("`%s` is a named pipe" % fn) try: fsrc = open(src, 'rb') fdst = open(dst, 'wb') @@ -157,14 +170,14 @@ def copytree(src, dst, symlinks=False, i elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore) else: + # Will raise a SpecialFileError for unsupported file types copy2(srcname, dstname) - # XXX What about devices, sockets etc.? - except (IOError, os.error), why: - errors.append((srcname, dstname, str(why))) # catch the Error from the recursive copytree so that we can # continue with other files except Error, err: errors.extend(err.args[0]) + except EnvironmentError, why: + errors.append((srcname, dstname, str(why))) try: copystat(src, dst) except OSError, why: diff -r 3698dc8dc0c1 -r ad2d93474a1b Lib/test/test_shutil.py --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -9,6 +9,7 @@ import os import os.path from test import test_support from test.test_support import TESTFN +TESTFN2 = TESTFN + "2" class TestShutil(unittest.TestCase): def test_rmtree_errors(self): @@ -226,6 +227,38 @@ class TestShutil(unittest.TestCase): finally: shutil.rmtree(TESTFN, ignore_errors=True) + if hasattr(os, "mkfifo"): + # Issue #3002: copyfile and copytree block indefinitely on named pipes + def test_copyfile_named_pipe(self): + os.mkfifo(TESTFN) + try: + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, TESTFN, TESTFN2) + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, __file__, TESTFN) + finally: + os.remove(TESTFN) + + def test_copytree_named_pipe(self): + os.mkdir(TESTFN) + try: + subdir = os.path.join(TESTFN, "subdir") + os.mkdir(subdir) + pipe = os.path.join(subdir, "mypipe") + os.mkfifo(pipe) + try: + shutil.copytree(TESTFN, TESTFN2) + except shutil.Error as e: + errors = e.args[0] + self.assertEqual(len(errors), 1) + src, dst, error_msg = errors[0] + self.assertEqual("`%s` is a named pipe" % pipe, error_msg) + else: + self.fail("shutil.Error should have been raised") + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + shutil.rmtree(TESTFN2, ignore_errors=True) + class TestMove(unittest.TestCase):