| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| 1 # Copyright (C) 2003 Python Software Foundation | 1 # Copyright (C) 2003 Python Software Foundation |
| 2 | 2 |
| 3 import unittest | 3 import unittest |
| 4 import shutil | 4 import shutil |
| 5 import tempfile | 5 import tempfile |
| 6 import sys | 6 import sys |
| 7 import stat | 7 import stat |
| 8 import os | 8 import os |
| 9 import os.path | 9 import os.path |
| 10 import functools | 10 import functools |
| 11 from test import support | 11 from test import support |
| 12 from test.support import TESTFN | 12 from test.support import TESTFN |
| 13 from os.path import splitdrive | 13 from os.path import splitdrive |
| 14 from distutils.spawn import find_executable, spawn | 14 from distutils.spawn import find_executable, spawn |
| 15 from shutil import (_make_tarball, _make_zipfile, make_archive, | 15 from shutil import (_make_tarball, _make_zipfile, make_archive, |
| 16 register_archive_format, unregister_archive_format, | 16 register_archive_format, unregister_archive_format, |
| 17 get_archive_formats, Error, unpack_archive, | 17 get_archive_formats, Error, unpack_archive, |
| 18 register_unpack_format, RegistryError, | 18 register_unpack_format, RegistryError, |
| 19 unregister_unpack_format, get_unpack_formats) | 19 unregister_unpack_format, get_unpack_formats) |
| 20 import tarfile | 20 import tarfile |
| 21 import warnings | 21 import warnings |
| 22 import time | |
| 23 import errno | |
| 24 try: | |
| 25 import threading | |
| 26 except ImportError: | |
| 27 threading = None | |
| 22 | 28 |
| 23 from test import support | 29 from test import support |
| 24 from test.support import TESTFN, check_warnings, captured_stdout, requires_zlib | 30 from test.support import TESTFN, check_warnings, captured_stdout, requires_zlib |
| 25 | 31 |
| 26 try: | 32 try: |
| 27 import bz2 | 33 import bz2 |
| 28 BZ2_SUPPORTED = True | 34 BZ2_SUPPORTED = True |
| 29 except ImportError: | 35 except ImportError: |
| 30 BZ2_SUPPORTED = False | 36 BZ2_SUPPORTED = False |
| 31 | 37 |
| (...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 145 # 99.9% of the time it initially fails to remove | 151 # 99.9% of the time it initially fails to remove |
| 146 # a file in the directory, so the first time through | 152 # a file in the directory, so the first time through |
| 147 # func is os.remove. | 153 # func is os.remove. |
| 148 # However, some Linux machines running ZFS on | 154 # However, some Linux machines running ZFS on |
| 149 # FUSE experienced a failure earlier in the process | 155 # FUSE experienced a failure earlier in the process |
| 150 # at os.listdir. The first failure may legally | 156 # at os.listdir. The first failure may legally |
| 151 # be either. | 157 # be either. |
| 152 if self.errorState == 0: | 158 if self.errorState == 0: |
| 153 if func is os.remove: | 159 if func is os.remove: |
| 154 self.assertEqual(arg, self.childpath) | 160 self.assertEqual(arg, self.childpath) |
| 161 elif func is os.unlinkat: | |
| 162 self.assertEqual(arg, (3, 'a')) | |
| 155 else: | 163 else: |
| 156 self.assertIs(func, os.listdir, | 164 self.assertIs(func, os.listdir, |
| 157 "func must be either os.remove or os.listdir") | 165 "func must be either os.remove or os.listdir") |
| 158 self.assertEqual(arg, TESTFN) | 166 self.assertEqual(arg, TESTFN) |
| 159 self.assertTrue(issubclass(exc[0], OSError)) | 167 self.assertTrue(issubclass(exc[0], OSError)) |
| 160 self.errorState = 1 | 168 self.errorState = 1 |
| 161 else: | 169 else: |
| 162 self.assertEqual(func, os.rmdir) | 170 self.assertEqual(func, os.rmdir) |
| 163 self.assertEqual(arg, TESTFN) | 171 self.assertEqual(arg, TESTFN) |
| 164 self.assertTrue(issubclass(exc[0], OSError)) | 172 self.assertTrue(issubclass(exc[0], OSError)) |
| (...skipping 135 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 300 # bug 1669. | 308 # bug 1669. |
| 301 os.mkdir(TESTFN) | 309 os.mkdir(TESTFN) |
| 302 try: | 310 try: |
| 303 src = os.path.join(TESTFN, 'cheese') | 311 src = os.path.join(TESTFN, 'cheese') |
| 304 dst = os.path.join(TESTFN, 'shop') | 312 dst = os.path.join(TESTFN, 'shop') |
| 305 os.mkdir(src) | 313 os.mkdir(src) |
| 306 os.symlink(src, dst) | 314 os.symlink(src, dst) |
| 307 self.assertRaises(OSError, shutil.rmtree, dst) | 315 self.assertRaises(OSError, shutil.rmtree, dst) |
| 308 finally: | 316 finally: |
| 309 shutil.rmtree(TESTFN, ignore_errors=True) | 317 shutil.rmtree(TESTFN, ignore_errors=True) |
| 318 | |
| 319 @unittest.skipIf(threading == None, 'requires threading') | |
|
eric.araujo
2011/10/07 19:29:47
You can just say skipUnless(threading, 'msg')
rosslagerwall
2011/10/07 21:18:46
Right.
| |
| 320 def test_rmtree_4489(self): | |
| 321 # Issue #4489: test a symlink attack on rmtree | |
| 322 | |
| 323 def hack(a, b): | |
| 324 time.sleep(0.1) | |
| 325 os.rename(b, b + 'x') | |
| 326 self.tempdirs.append(b + 'x') | |
| 327 os.symlink(a, b) | |
| 328 | |
| 329 a = self.mkdtemp() | |
| 330 b = self.mkdtemp() | |
| 331 | |
| 332 for i in range(25000): | |
| 333 open(os.path.join(a, str(i)), "w").close() | |
| 334 open(os.path.join(b, str(i)), "w").close() | |
| 335 | |
| 336 l = sorted(os.listdir(a)) | |
| 337 | |
| 338 hackt = threading.Thread(target=hack, args=(a, b)) | |
| 339 hackt.start() | |
| 340 try: | |
| 341 shutil.rmtree(b) | |
| 342 except OSError as inst: | |
| 343 if inst.errno != errno.ENOTDIR: | |
| 344 raise | |
|
eric.araujo
2011/10/07 19:29:47
Shouldn’t this use self.fail?
rosslagerwall
2011/10/07 21:18:46
I would have thought it's better if the original e
| |
| 345 finally: | |
| 346 support.unlink(b) | |
| 347 self.tempdirs.remove(b) | |
| 348 if hasattr(os, 'openat') and hasattr(os, 'fdlistdir') and hasattr(os, | |
| 349 'unlinkat') and hasattr(os, 'fstatat'): | |
| 350 self.assertEqual(l, sorted(os.listdir(a))) | |
| 351 | |
| 310 | 352 |
| 311 if hasattr(os, "mkfifo"): | 353 if hasattr(os, "mkfifo"): |
| 312 # Issue #3002: copyfile and copytree block indefinitely on named pipes | 354 # Issue #3002: copyfile and copytree block indefinitely on named pipes |
| 313 def test_copyfile_named_pipe(self): | 355 def test_copyfile_named_pipe(self): |
| 314 os.mkfifo(TESTFN) | 356 os.mkfifo(TESTFN) |
| 315 try: | 357 try: |
| 316 self.assertRaises(shutil.SpecialFileError, | 358 self.assertRaises(shutil.SpecialFileError, |
| 317 shutil.copyfile, TESTFN, TESTFN2) | 359 shutil.copyfile, TESTFN, TESTFN2) |
| 318 self.assertRaises(shutil.SpecialFileError, | 360 self.assertRaises(shutil.SpecialFileError, |
| 319 shutil.copyfile, __file__, TESTFN) | 361 shutil.copyfile, __file__, TESTFN) |
| (...skipping 684 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1004 finally: | 1046 finally: |
| 1005 os.rmdir(dst_dir) | 1047 os.rmdir(dst_dir) |
| 1006 | 1048 |
| 1007 | 1049 |
| 1008 | 1050 |
| 1009 def test_main(): | 1051 def test_main(): |
| 1010 support.run_unittest(TestShutil, TestMove, TestCopyFile) | 1052 support.run_unittest(TestShutil, TestMove, TestCopyFile) |
| 1011 | 1053 |
| 1012 if __name__ == '__main__': | 1054 if __name__ == '__main__': |
| 1013 test_main() | 1055 test_main() |
| OLD | NEW |