diff -r 1ab124a6f171 -r 4832bf7aa028 Lib/shutil.py --- a/Lib/shutil.py Wed Dec 21 11:24:47 2011 +0100 +++ b/Lib/shutil.py Wed Dec 21 17:40:52 2011 +0100 @@ -82,8 +82,13 @@ return (os.path.normcase(os.path.abspath(src)) == os.path.normcase(os.path.abspath(dst))) -def copyfile(src, dst): - """Copy data from src to dst""" +def copyfile(src, dst, symlinks=False): + """Copy data from src to dst. + + If optional flag `symlinks` is set and `src` is a symbolic link, a new + symlink will be created instead of copying the file it points to. + + """ if _samefile(src, dst): raise Error("`%s` and `%s` are the same file" % (src, dst)) @@ -98,54 +103,94 @@ if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) - with open(src, 'rb') as fsrc: - with open(dst, 'wb') as fdst: - copyfileobj(fsrc, fdst) + if symlinks and os.path.islink(src): + os.symlink(os.readlink(src), dst) + else: + with open(src, 'rb') as fsrc: + with open(dst, 'wb') as fdst: + copyfileobj(fsrc, fdst) -def copymode(src, dst): - """Copy mode bits from src to dst""" - if hasattr(os, 'chmod'): - st = os.stat(src) - mode = stat.S_IMODE(st.st_mode) - os.chmod(dst, mode) +def copymode(src, dst, symlinks=False): + """Copy mode bits from src to dst. -def copystat(src, dst): - """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" - st = os.stat(src) + If the optional flag `symlinks` is set, symlinks aren't followed if and + only if both `src` and `dst` are symlinks. If `lchmod` isn't available (eg. + Linux), in these cases, this method does nothing. + + """ + if symlinks and os.path.islink(src) and os.path.islink(dst): + if hasattr(os, 'lchmod'): + stat_func, chmod_func = os.lstat, os.lchmod + else: + return + elif hasattr(os, 'chmod'): + stat_func, chmod_func = os.stat, os.chmod + else: + return + + st = stat_func(src) + chmod_func(dst, stat.S_IMODE(st.st_mode)) + +def copystat(src, dst, symlinks=False): + """Copy all stat info (mode bits, atime, mtime, flags) from src to dst. + + If the optional flag `symlinks` is set, symlinks aren't followed if and + only if both `src` and `dst` are symlinks. + + """ + def _nop(*args): + pass + + if symlinks and os.path.islink(src) and os.path.islink(dst): + stat_func = os.lstat + utime_func = os.lutimes if hasattr(os, 'lutimes') else _nop + chmod_func = os.lchmod if hasattr(os, 'lchmod') else _nop + chflags_func = os.lchflags if hasattr(os, 'lchflags') else _nop + else: + stat_func = os.stat + utime_func = os.utime if hasattr(os, 'utime') else _nop + chmod_func = os.chmod if hasattr(os, 'chmod') else _nop + chflags_func = os.chflags if hasattr(os, 'chflags') else _nop + + st = stat_func(src) mode = stat.S_IMODE(st.st_mode) - if hasattr(os, 'utime'): - os.utime(dst, (st.st_atime, st.st_mtime)) - if hasattr(os, 'chmod'): - os.chmod(dst, mode) - if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): + utime_func(dst, (st.st_atime, st.st_mtime)) + chmod_func(dst, mode) + if hasattr(st, 'st_flags'): try: - os.chflags(dst, st.st_flags) + chflags_func(dst, st.st_flags) except OSError as why: if (not hasattr(errno, 'EOPNOTSUPP') or why.errno != errno.EOPNOTSUPP): raise -def copy(src, dst): +def copy(src, dst, symlinks=False): """Copy data and mode bits ("cp src dst"). The destination may be a directory. + If the optional flag `symlinks` is set, symlinks won't be followed. This + resembles "cp -P src dst". + """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) - copyfile(src, dst) - copymode(src, dst) + copyfile(src, dst, symlinks=symlinks) + copymode(src, dst, symlinks=symlinks) -def copy2(src, dst): +def copy2(src, dst, symlinks=False): """Copy data and all stat info ("cp -p src dst"). The destination may be a directory. + If the optional flag `symlinks` is set, symlinks won't be followed. This + resembles "cp -P src dst". + """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) - copyfile(src, dst) - copystat(src, dst) + copyfile(src, dst, symlinks=symlinks) + copystat(src, dst, symlinks=symlinks) def ignore_patterns(*patterns): """Function that can be used as copytree() ignore parameter. @@ -212,7 +257,11 @@ if os.path.islink(srcname): linkto = os.readlink(srcname) if symlinks: + # We can't just leave it to copy_function b/c legacy code + # w/ custom copy_function may rely on copytree doing the + # right thing. os.symlink(linkto, dstname) + copystat(srcname, dstname, symlinks=symlinks) else: # ignore dangling symlink if the flag is on if not os.path.exists(linkto) and ignore_dangling_symlinks: diff -r 1ab124a6f171 -r 4832bf7aa028 Lib/test/test_shutil.py --- a/Lib/test/test_shutil.py Wed Dec 21 11:24:47 2011 +0100 +++ b/Lib/test/test_shutil.py Wed Dec 21 17:40:52 2011 +0100 @@ -164,6 +164,197 @@ self.assertTrue(issubclass(exc[0], OSError)) self.errorState = 2 + @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') + @support.skip_unless_symlink + def test_copymode_follow_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + # file to file + os.chmod(dst, stat.S_IRWXO) + self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + shutil.copymode(src, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow src link + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src_link, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow dst link + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src, dst_link) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow both links + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src_link, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') + @support.skip_unless_symlink + def test_copymode_symlink_to_symlink(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + os.chmod(dst, stat.S_IRWXU) + os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) + # link to link + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src_link, dst_link, symlinks=True) + self.assertEqual(os.lstat(src_link).st_mode, + os.lstat(dst_link).st_mode) + self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # src link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src_link, dst, symlinks=True) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # dst link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src, dst_link, symlinks=True) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') + @support.skip_unless_symlink + def test_copymode_symlink_to_symlink_wo_lchmod(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + shutil.copymode(src_link, dst_link, symlinks=True) # silent fail + + @support.skip_unless_symlink + def test_copystat_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'qux') + write_file(src, 'foo') + src_stat = os.stat(src) + os.utime(src, (src_stat.st_atime, + src_stat.st_mtime - 42.0)) # ensure different mtimes + write_file(dst, 'bar') + self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) + os.symlink(src, src_link) + os.symlink(dst, dst_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXO) + if hasattr(os, 'lchflags'): + os.lchflags(src_link, stat.UF_NODUMP) + src_link_stat = os.lstat(src_link) + # follow + if hasattr(os, 'lchmod'): + shutil.copystat(src_link, dst_link, symlinks=False) + self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) + # don't follow + shutil.copystat(src_link, dst_link, symlinks=True) + dst_link_stat = os.lstat(dst_link) + if hasattr(os, 'lutimes'): + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_link_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) + # tell to follow but dst is not a link + shutil.copystat(src_link, dst, symlinks=True) + self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < + 00000.1) + + @support.skip_unless_symlink + def test_copy_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + # don't follow + shutil.copy(src_link, dst, symlinks=False) + self.assertFalse(os.path.islink(dst)) + self.assertEqual(read_file(src), read_file(dst)) + os.remove(dst) + # follow + shutil.copy(src_link, dst, symlinks=True) + self.assertTrue(os.path.islink(dst)) + self.assertEqual(os.readlink(dst), os.readlink(src_link)) + if hasattr(os, 'lchmod'): + self.assertEqual(os.lstat(src_link).st_mode, + os.lstat(dst).st_mode) + + @support.skip_unless_symlink + def test_copy2_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + if hasattr(os, 'lchflags'): + os.lchflags(src_link, stat.UF_NODUMP) + src_stat = os.stat(src) + src_link_stat = os.lstat(src_link) + # follow + shutil.copy2(src_link, dst, symlinks=False) + self.assertFalse(os.path.islink(dst)) + self.assertEqual(read_file(src), read_file(dst)) + os.remove(dst) + # don't follow + shutil.copy2(src_link, dst, symlinks=True) + self.assertTrue(os.path.islink(dst)) + self.assertEqual(os.readlink(dst), os.readlink(src_link)) + dst_stat = os.lstat(dst) + if hasattr(os, 'lutimes'): + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) + self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) + + @support.skip_unless_symlink + def test_copyfile_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'src') + dst = os.path.join(tmp_dir, 'dst') + dst_link = os.path.join(tmp_dir, 'dst_link') + link = os.path.join(tmp_dir, 'link') + write_file(src, 'foo') + os.symlink(src, link) + # don't follow + shutil.copyfile(link, dst_link, symlinks=True) + self.assertTrue(os.path.islink(dst_link)) + self.assertEqual(os.readlink(link), os.readlink(dst_link)) + # follow + shutil.copyfile(link, dst) + self.assertFalse(os.path.islink(dst)) + def test_rmtree_dont_delete_file(self): # When called on a file instead of a directory, don't delete it. handle, path = tempfile.mkstemp() @@ -190,6 +381,34 @@ actual = read_file((dst_dir, 'test_dir', 'test.txt')) self.assertEqual(actual, '456') + @support.skip_unless_symlink + def test_copytree_symlinks(self): + tmp_dir = self.mkdtemp() + src_dir = os.path.join(tmp_dir, 'src') + dst_dir = os.path.join(tmp_dir, 'dst') + sub_dir = os.path.join(src_dir, 'sub') + os.mkdir(src_dir) + os.mkdir(sub_dir) + write_file((src_dir, 'file.txt'), 'foo') + src_link = os.path.join(sub_dir, 'link') + dst_link = os.path.join(dst_dir, 'sub/link') + os.symlink(os.path.join(src_dir, 'file.txt'), + src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + if hasattr(os, 'lchflags'): + os.lchflags(src_link, stat.UF_NODUMP) + src_stat = os.lstat(src_link) + shutil.copytree(src_dir, dst_dir, symlinks=True) + self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) + self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')), + os.path.join(src_dir, 'file.txt')) + dst_stat = os.lstat(dst_link) + if hasattr(os, 'lchmod'): + self.assertEqual(dst_stat.st_mode, src_stat.st_mode) + if hasattr(os, 'lchflags'): + self.assertEqual(dst_stat.st_flags, src_stat.st_flags) + def test_copytree_with_exclude(self): # creating data join = os.path.join