LEFT | RIGHT |
1 # As a test suite for the os module, this is woefully inadequate, but this | 1 # As a test suite for the os module, this is woefully inadequate, but this |
2 # does add tests for a few functions which have been determined to be more | 2 # does add tests for a few functions which have been determined to be more |
3 # portable than they had been thought to be. | 3 # portable than they had been thought to be. |
4 | 4 |
5 import os | 5 import os |
6 import errno | 6 import errno |
7 import unittest | 7 import unittest |
8 import warnings | 8 import warnings |
9 import sys | 9 import sys |
10 import signal | 10 import signal |
11 import subprocess | 11 import subprocess |
12 import time | 12 import time |
13 import shutil | 13 import shutil |
14 from test import support | 14 from test import support |
15 import contextlib | 15 import contextlib |
16 import mmap | 16 import mmap |
17 import platform | 17 import platform |
18 import re | 18 import re |
19 import uuid | 19 import uuid |
20 import asyncore | 20 import asyncore |
21 import asynchat | 21 import asynchat |
22 import socket | 22 import socket |
23 import itertools | 23 import itertools |
24 import stat | 24 import stat |
25 import locale | 25 import locale |
26 import codecs | 26 import codecs |
| 27 import decimal |
| 28 import fractions |
| 29 import pickle |
27 try: | 30 try: |
28 import threading | 31 import threading |
29 except ImportError: | 32 except ImportError: |
30 threading = None | 33 threading = None |
| 34 try: |
| 35 import resource |
| 36 except ImportError: |
| 37 resource = None |
| 38 try: |
| 39 import fcntl |
| 40 except ImportError: |
| 41 fcntl = None |
| 42 |
31 from test.script_helper import assert_python_ok | 43 from test.script_helper import assert_python_ok |
32 | 44 |
33 with warnings.catch_warnings(): | 45 with warnings.catch_warnings(): |
34 warnings.simplefilter("ignore", DeprecationWarning) | 46 warnings.simplefilter("ignore", DeprecationWarning) |
35 os.stat_float_times(True) | 47 os.stat_float_times(True) |
36 st = os.stat(__file__) | 48 st = os.stat(__file__) |
37 stat_supports_subsecond = ( | 49 stat_supports_subsecond = ( |
38 # check if float and int timestamps are different | 50 # check if float and int timestamps are different |
39 (st.st_atime != st[7]) | 51 (st.st_atime != st[7]) |
40 or (st.st_mtime != st[8]) | 52 or (st.st_mtime != st[8]) |
41 or (st.st_ctime != st[9])) | 53 or (st.st_ctime != st[9])) |
42 | 54 |
43 # Detect whether we're on a Linux system that uses the (now outdated | 55 # Detect whether we're on a Linux system that uses the (now outdated |
44 # and unmaintained) linuxthreads threading library. There's an issue | 56 # and unmaintained) linuxthreads threading library. There's an issue |
45 # when combining linuxthreads with a failed execv call: see | 57 # when combining linuxthreads with a failed execv call: see |
46 # http://bugs.python.org/issue4970. | 58 # http://bugs.python.org/issue4970. |
47 if hasattr(sys, 'thread_info') and sys.thread_info.version: | 59 if hasattr(sys, 'thread_info') and sys.thread_info.version: |
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") | 60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") |
49 else: | 61 else: |
50 USING_LINUXTHREADS = False | 62 USING_LINUXTHREADS = False |
| 63 |
| 64 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. |
| 65 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 |
51 | 66 |
52 # Tests creating TESTFN | 67 # Tests creating TESTFN |
53 class FileTests(unittest.TestCase): | 68 class FileTests(unittest.TestCase): |
54 def setUp(self): | 69 def setUp(self): |
55 if os.path.exists(support.TESTFN): | 70 if os.path.exists(support.TESTFN): |
56 os.unlink(support.TESTFN) | 71 os.unlink(support.TESTFN) |
57 tearDown = setUp | 72 tearDown = setUp |
58 | 73 |
59 def test_access(self): | 74 def test_access(self): |
60 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) | 75 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
163 os.mkdir(support.TESTFN) | 178 os.mkdir(support.TESTFN) |
164 self.fname = os.path.join(support.TESTFN, "f1") | 179 self.fname = os.path.join(support.TESTFN, "f1") |
165 f = open(self.fname, 'wb') | 180 f = open(self.fname, 'wb') |
166 f.write(b"ABC") | 181 f.write(b"ABC") |
167 f.close() | 182 f.close() |
168 | 183 |
169 def tearDown(self): | 184 def tearDown(self): |
170 os.unlink(self.fname) | 185 os.unlink(self.fname) |
171 os.rmdir(support.TESTFN) | 186 os.rmdir(support.TESTFN) |
172 | 187 |
| 188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') |
173 def check_stat_attributes(self, fname): | 189 def check_stat_attributes(self, fname): |
174 if not hasattr(os, "stat"): | |
175 return | |
176 | |
177 result = os.stat(fname) | 190 result = os.stat(fname) |
178 | 191 |
179 # Make sure direct access works | 192 # Make sure direct access works |
180 self.assertEqual(result[stat.ST_SIZE], 3) | 193 self.assertEqual(result[stat.ST_SIZE], 3) |
181 self.assertEqual(result.st_size, 3) | 194 self.assertEqual(result.st_size, 3) |
182 | 195 |
183 # Make sure all the attributes are there | 196 # Make sure all the attributes are there |
184 members = dir(result) | 197 members = dir(result) |
185 for name in dir(stat): | 198 for name in dir(stat): |
186 if name[:3] == 'ST_': | 199 if name[:3] == 'ST_': |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 | 256 |
244 def test_stat_attributes_bytes(self): | 257 def test_stat_attributes_bytes(self): |
245 try: | 258 try: |
246 fname = self.fname.encode(sys.getfilesystemencoding()) | 259 fname = self.fname.encode(sys.getfilesystemencoding()) |
247 except UnicodeEncodeError: | 260 except UnicodeEncodeError: |
248 self.skipTest("cannot encode %a for the filesystem" % self.fname) | 261 self.skipTest("cannot encode %a for the filesystem" % self.fname) |
249 with warnings.catch_warnings(): | 262 with warnings.catch_warnings(): |
250 warnings.simplefilter("ignore", DeprecationWarning) | 263 warnings.simplefilter("ignore", DeprecationWarning) |
251 self.check_stat_attributes(fname) | 264 self.check_stat_attributes(fname) |
252 | 265 |
| 266 def test_stat_result_pickle(self): |
| 267 result = os.stat(self.fname) |
| 268 p = pickle.dumps(result) |
| 269 self.assertIn(b'\x03cos\nstat_result\n', p) |
| 270 unpickled = pickle.loads(p) |
| 271 self.assertEqual(result, unpickled) |
| 272 |
| 273 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') |
253 def test_statvfs_attributes(self): | 274 def test_statvfs_attributes(self): |
254 if not hasattr(os, "statvfs"): | |
255 return | |
256 | |
257 try: | 275 try: |
258 result = os.statvfs(self.fname) | 276 result = os.statvfs(self.fname) |
259 except OSError as e: | 277 except OSError as e: |
260 # On AtheOS, glibc always returns ENOSYS | 278 # On AtheOS, glibc always returns ENOSYS |
261 if e.errno == errno.ENOSYS: | 279 if e.errno == errno.ENOSYS: |
262 return | 280 self.skipTest('os.statvfs() failed with ENOSYS') |
263 | 281 |
264 # Make sure direct access works | 282 # Make sure direct access works |
265 self.assertEqual(result.f_bfree, result[3]) | 283 self.assertEqual(result.f_bfree, result[3]) |
266 | 284 |
267 # Make sure all the attributes are there. | 285 # Make sure all the attributes are there. |
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', | 286 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', |
269 'ffree', 'favail', 'flag', 'namemax') | 287 'ffree', 'favail', 'flag', 'namemax') |
270 for value, member in enumerate(members): | 288 for value, member in enumerate(members): |
271 self.assertEqual(getattr(result, 'f_' + member), result[value]) | 289 self.assertEqual(getattr(result, 'f_' + member), result[value]) |
272 | 290 |
(...skipping 15 matching lines...) Expand all Loading... |
288 result2 = os.statvfs_result((10,)) | 306 result2 = os.statvfs_result((10,)) |
289 self.fail("No exception raised") | 307 self.fail("No exception raised") |
290 except TypeError: | 308 except TypeError: |
291 pass | 309 pass |
292 | 310 |
293 # Use the constructor with a too-long tuple. | 311 # Use the constructor with a too-long tuple. |
294 try: | 312 try: |
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) | 313 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) |
296 except TypeError: | 314 except TypeError: |
297 pass | 315 pass |
| 316 |
| 317 @unittest.skipUnless(hasattr(os, 'statvfs'), |
| 318 "need os.statvfs()") |
| 319 def test_statvfs_result_pickle(self): |
| 320 try: |
| 321 result = os.statvfs(self.fname) |
| 322 except OSError as e: |
| 323 # On AtheOS, glibc always returns ENOSYS |
| 324 if e.errno == errno.ENOSYS: |
| 325 self.skipTest('os.statvfs() failed with ENOSYS') |
| 326 |
| 327 p = pickle.dumps(result) |
| 328 self.assertIn(b'\x03cos\nstatvfs_result\n', p) |
| 329 unpickled = pickle.loads(p) |
| 330 self.assertEqual(result, unpickled) |
298 | 331 |
299 def test_utime_dir(self): | 332 def test_utime_dir(self): |
300 delta = 1000000 | 333 delta = 1000000 |
301 st = os.stat(support.TESTFN) | 334 st = os.stat(support.TESTFN) |
302 # round to int, because some systems may support sub-second | 335 # round to int, because some systems may support sub-second |
303 # time stamps in stat, but not in utime. | 336 # time stamps in stat, but not in utime. |
304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) | 337 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) |
305 st2 = os.stat(support.TESTFN) | 338 st2 = os.stat(support.TESTFN) |
306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) | 339 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) |
307 | 340 |
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
435 def set_time(filename, atime, mtime): | 468 def set_time(filename, atime, mtime): |
436 dirname = os.path.dirname(filename) | 469 dirname = os.path.dirname(filename) |
437 dirfd = os.open(dirname, os.O_RDONLY) | 470 dirfd = os.open(dirname, os.O_RDONLY) |
438 try: | 471 try: |
439 os.utime(os.path.basename(filename), dir_fd=dirfd, | 472 os.utime(os.path.basename(filename), dir_fd=dirfd, |
440 times=(atime, mtime)) | 473 times=(atime, mtime)) |
441 finally: | 474 finally: |
442 os.close(dirfd) | 475 os.close(dirfd) |
443 self._test_utime_subsecond(set_time) | 476 self._test_utime_subsecond(set_time) |
444 | 477 |
445 # Restrict test to Win32, since there is no guarantee other | 478 # Restrict tests to Win32, since there is no guarantee other |
446 # systems support centiseconds | 479 # systems support centiseconds |
447 if sys.platform == 'win32': | 480 def get_file_system(path): |
448 def get_file_system(path): | 481 if sys.platform == 'win32': |
449 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' | 482 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' |
450 import ctypes | 483 import ctypes |
451 kernel32 = ctypes.windll.kernel32 | 484 kernel32 = ctypes.windll.kernel32 |
452 buf = ctypes.create_unicode_buffer("", 100) | 485 buf = ctypes.create_unicode_buffer("", 100) |
453 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b
uf, len(buf)): | 486 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b
uf, len(buf)): |
454 return buf.value | 487 return buf.value |
455 | 488 |
456 if get_file_system(support.TESTFN) == "NTFS": | 489 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
457 def test_1565150(self): | 490 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", |
458 t1 = 1159195039.25 | 491 "requires NTFS") |
459 os.utime(self.fname, (t1, t1)) | 492 def test_1565150(self): |
460 self.assertEqual(os.stat(self.fname).st_mtime, t1) | 493 t1 = 1159195039.25 |
461 | 494 os.utime(self.fname, (t1, t1)) |
462 def test_large_time(self): | 495 self.assertEqual(os.stat(self.fname).st_mtime, t1) |
463 t1 = 5000000000 # some day in 2128 | 496 |
464 os.utime(self.fname, (t1, t1)) | 497 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
465 self.assertEqual(os.stat(self.fname).st_mtime, t1) | 498 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", |
466 | 499 "requires NTFS") |
467 def test_1686475(self): | 500 def test_large_time(self): |
468 # Verify that an open file can be stat'ed | 501 t1 = 5000000000 # some day in 2128 |
469 try: | 502 os.utime(self.fname, (t1, t1)) |
470 os.stat(r"c:\pagefile.sys") | 503 self.assertEqual(os.stat(self.fname).st_mtime, t1) |
471 except FileNotFoundError: | 504 |
472 pass # file does not exist; cannot run test | 505 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
473 except OSError as e: | 506 def test_1686475(self): |
474 self.fail("Could not stat pagefile.sys") | 507 # Verify that an open file can be stat'ed |
475 | 508 try: |
476 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") | 509 os.stat(r"c:\pagefile.sys") |
477 def test_15261(self): | 510 except FileNotFoundError: |
478 # Verify that stat'ing a closed fd does not cause crash | 511 self.skipTest(r'c:\pagefile.sys does not exist') |
479 r, w = os.pipe() | 512 except OSError as e: |
480 try: | 513 self.fail("Could not stat pagefile.sys") |
481 os.stat(r) # should not raise error | 514 |
482 finally: | 515 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
483 os.close(r) | 516 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") |
484 os.close(w) | 517 def test_15261(self): |
485 with self.assertRaises(OSError) as ctx: | 518 # Verify that stat'ing a closed fd does not cause crash |
486 os.stat(r) | 519 r, w = os.pipe() |
487 self.assertEqual(ctx.exception.errno, errno.EBADF) | 520 try: |
| 521 os.stat(r) # should not raise error |
| 522 finally: |
| 523 os.close(r) |
| 524 os.close(w) |
| 525 with self.assertRaises(OSError) as ctx: |
| 526 os.stat(r) |
| 527 self.assertEqual(ctx.exception.errno, errno.EBADF) |
488 | 528 |
489 from test import mapping_tests | 529 from test import mapping_tests |
490 | 530 |
491 class EnvironTests(mapping_tests.BasicTestMappingProtocol): | 531 class EnvironTests(mapping_tests.BasicTestMappingProtocol): |
492 """check that os.environ object conform to mapping protocol""" | 532 """check that os.environ object conform to mapping protocol""" |
493 type2test = None | 533 type2test = None |
494 | 534 |
495 def setUp(self): | 535 def setUp(self): |
496 self.__save = dict(os.environ) | 536 self.__save = dict(os.environ) |
497 if os.supports_bytes_environ: | 537 if os.supports_bytes_environ: |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
621 @support.requires_mac_ver(10, 6) | 661 @support.requires_mac_ver(10, 6) |
622 def test_unset_error(self): | 662 def test_unset_error(self): |
623 if sys.platform == "win32": | 663 if sys.platform == "win32": |
624 # an environment variable is limited to 32,767 characters | 664 # an environment variable is limited to 32,767 characters |
625 key = 'x' * 50000 | 665 key = 'x' * 50000 |
626 self.assertRaises(ValueError, os.environ.__delitem__, key) | 666 self.assertRaises(ValueError, os.environ.__delitem__, key) |
627 else: | 667 else: |
628 # "=" is not allowed in a variable name | 668 # "=" is not allowed in a variable name |
629 key = 'key=' | 669 key = 'key=' |
630 self.assertRaises(OSError, os.environ.__delitem__, key) | 670 self.assertRaises(OSError, os.environ.__delitem__, key) |
| 671 |
| 672 def test_key_type(self): |
| 673 missing = 'missingkey' |
| 674 self.assertNotIn(missing, os.environ) |
| 675 |
| 676 with self.assertRaises(KeyError) as cm: |
| 677 os.environ[missing] |
| 678 self.assertIs(cm.exception.args[0], missing) |
| 679 self.assertTrue(cm.exception.__suppress_context__) |
| 680 |
| 681 with self.assertRaises(KeyError) as cm: |
| 682 del os.environ[missing] |
| 683 self.assertIs(cm.exception.args[0], missing) |
| 684 self.assertTrue(cm.exception.__suppress_context__) |
| 685 |
631 | 686 |
632 class WalkTests(unittest.TestCase): | 687 class WalkTests(unittest.TestCase): |
633 """Tests for os.walk().""" | 688 """Tests for os.walk().""" |
634 | 689 |
635 def setUp(self): | 690 def setUp(self): |
636 import os | 691 import os |
637 from os.path import join | 692 from os.path import join |
638 | 693 |
639 # Build: | 694 # Build: |
640 # TESTFN/ | 695 # TESTFN/ |
(...skipping 23 matching lines...) Expand all Loading... |
664 | 719 |
665 # Create stuff. | 720 # Create stuff. |
666 os.makedirs(sub11_path) | 721 os.makedirs(sub11_path) |
667 os.makedirs(sub2_path) | 722 os.makedirs(sub2_path) |
668 os.makedirs(t2_path) | 723 os.makedirs(t2_path) |
669 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: | 724 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: |
670 f = open(path, "w") | 725 f = open(path, "w") |
671 f.write("I'm " + path + " and proud of it. Blame test_os.\n") | 726 f.write("I'm " + path + " and proud of it. Blame test_os.\n") |
672 f.close() | 727 f.close() |
673 if support.can_symlink(): | 728 if support.can_symlink(): |
674 if os.name == 'nt': | 729 os.symlink(os.path.abspath(t2_path), link_path) |
675 def symlink_to_dir(src, dest): | 730 os.symlink('broken', broken_link_path, True) |
676 os.symlink(src, dest, True) | |
677 else: | |
678 symlink_to_dir = os.symlink | |
679 symlink_to_dir(os.path.abspath(t2_path), link_path) | |
680 symlink_to_dir('broken', broken_link_path) | |
681 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) | 731 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) |
682 else: | 732 else: |
683 sub2_tree = (sub2_path, [], ["tmp3"]) | 733 sub2_tree = (sub2_path, [], ["tmp3"]) |
684 | 734 |
685 # Walk top-down. | 735 # Walk top-down. |
686 all = list(os.walk(walk_path)) | 736 all = list(os.walk(walk_path)) |
687 self.assertEqual(len(all), 4) | 737 self.assertEqual(len(all), 4) |
688 # We can't know which order SUB1 and SUB2 will appear in. | 738 # We can't know which order SUB1 and SUB2 will appear in. |
689 # Not flipped: TESTFN, SUB1, SUB11, SUB2 | 739 # Not flipped: TESTFN, SUB1, SUB11, SUB2 |
690 # flipped: TESTFN, SUB2, SUB1, SUB11 | 740 # flipped: TESTFN, SUB2, SUB1, SUB11 |
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
847 def test_exist_ok_existing_directory(self): | 897 def test_exist_ok_existing_directory(self): |
848 path = os.path.join(support.TESTFN, 'dir1') | 898 path = os.path.join(support.TESTFN, 'dir1') |
849 mode = 0o777 | 899 mode = 0o777 |
850 old_mask = os.umask(0o022) | 900 old_mask = os.umask(0o022) |
851 os.makedirs(path, mode) | 901 os.makedirs(path, mode) |
852 self.assertRaises(OSError, os.makedirs, path, mode) | 902 self.assertRaises(OSError, os.makedirs, path, mode) |
853 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) | 903 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) |
854 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True) | 904 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True) |
855 os.makedirs(path, mode=mode, exist_ok=True) | 905 os.makedirs(path, mode=mode, exist_ok=True) |
856 os.umask(old_mask) | 906 os.umask(old_mask) |
| 907 |
| 908 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') |
| 909 def test_chown_uid_gid_arguments_must_be_index(self): |
| 910 stat = os.stat(support.TESTFN) |
| 911 uid = stat.st_uid |
| 912 gid = stat.st_gid |
| 913 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2))
: |
| 914 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) |
| 915 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) |
| 916 self.assertIsNone(os.chown(support.TESTFN, uid, gid)) |
| 917 self.assertIsNone(os.chown(support.TESTFN, -1, -1)) |
857 | 918 |
858 def test_exist_ok_s_isgid_directory(self): | 919 def test_exist_ok_s_isgid_directory(self): |
859 path = os.path.join(support.TESTFN, 'dir1') | 920 path = os.path.join(support.TESTFN, 'dir1') |
860 S_ISGID = stat.S_ISGID | 921 S_ISGID = stat.S_ISGID |
861 mode = 0o777 | 922 mode = 0o777 |
862 old_mask = os.umask(0o022) | 923 old_mask = os.umask(0o022) |
863 try: | 924 try: |
864 existing_testfn_mode = stat.S_IMODE( | 925 existing_testfn_mode = stat.S_IMODE( |
865 os.lstat(support.TESTFN).st_mode) | 926 os.lstat(support.TESTFN).st_mode) |
866 try: | 927 try: |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
980 out = assert_python_ok('-c', code) | 1041 out = assert_python_ok('-c', code) |
981 stdout = out[1] | 1042 stdout = out[1] |
982 self.assertEqual(len(stdout), 16) | 1043 self.assertEqual(len(stdout), 16) |
983 return stdout | 1044 return stdout |
984 | 1045 |
985 def test_urandom_subprocess(self): | 1046 def test_urandom_subprocess(self): |
986 data1 = self.get_urandom_subprocess(16) | 1047 data1 = self.get_urandom_subprocess(16) |
987 data2 = self.get_urandom_subprocess(16) | 1048 data2 = self.get_urandom_subprocess(16) |
988 self.assertNotEqual(data1, data2) | 1049 self.assertNotEqual(data1, data2) |
989 | 1050 |
| 1051 @unittest.skipUnless(resource, "test requires the resource module") |
| 1052 def test_urandom_failure(self): |
| 1053 # Check urandom() failing when it is not able to open /dev/random. |
| 1054 # We spawn a new process to make the test more robust (if getrlimit() |
| 1055 # failed to restore the file descriptor limit after this, the whole |
| 1056 # test suite would crash; this actually happened on the OS X Tiger |
| 1057 # buildbot). |
| 1058 code = """if 1: |
| 1059 import errno |
| 1060 import os |
| 1061 import resource |
| 1062 |
| 1063 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) |
| 1064 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) |
| 1065 try: |
| 1066 os.urandom(16) |
| 1067 except OSError as e: |
| 1068 assert e.errno == errno.EMFILE, e.errno |
| 1069 else: |
| 1070 raise AssertionError("OSError not raised") |
| 1071 """ |
| 1072 assert_python_ok('-c', code) |
| 1073 |
| 1074 |
990 @contextlib.contextmanager | 1075 @contextlib.contextmanager |
991 def _execvpe_mockup(defpath=None): | 1076 def _execvpe_mockup(defpath=None): |
992 """ | 1077 """ |
993 Stubs out execv and execve functions when used as context manager. | 1078 Stubs out execv and execve functions when used as context manager. |
994 Records exec calls. The mock execv and execve functions always raise an | 1079 Records exec calls. The mock execv and execve functions always raise an |
995 exception as they would normally never return. | 1080 exception as they would normally never return. |
996 """ | 1081 """ |
997 # A list of tuples containing (function name, first arg, args) | 1082 # A list of tuples containing (function name, first arg, args) |
998 # of calls to execv or execve that have been made. | 1083 # of calls to execv or execve that have been made. |
999 calls = [] | 1084 calls = [] |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1078 self.assertEqual(len(calls), 1) | 1163 self.assertEqual(len(calls), 1) |
1079 self.assertSequenceEqual(calls[0], | 1164 self.assertSequenceEqual(calls[0], |
1080 ('execve', native_fullpath, (arguments, env_path))) | 1165 ('execve', native_fullpath, (arguments, env_path))) |
1081 | 1166 |
1082 def test_internal_execvpe_str(self): | 1167 def test_internal_execvpe_str(self): |
1083 self._test_internal_execvpe(str) | 1168 self._test_internal_execvpe(str) |
1084 if os.name != "nt": | 1169 if os.name != "nt": |
1085 self._test_internal_execvpe(bytes) | 1170 self._test_internal_execvpe(bytes) |
1086 | 1171 |
1087 | 1172 |
| 1173 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
1088 class Win32ErrorTests(unittest.TestCase): | 1174 class Win32ErrorTests(unittest.TestCase): |
1089 def test_rename(self): | 1175 def test_rename(self): |
1090 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b
ak") | 1176 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b
ak") |
1091 | 1177 |
1092 def test_remove(self): | 1178 def test_remove(self): |
1093 self.assertRaises(OSError, os.remove, support.TESTFN) | 1179 self.assertRaises(OSError, os.remove, support.TESTFN) |
1094 | 1180 |
1095 def test_chdir(self): | 1181 def test_chdir(self): |
1096 self.assertRaises(OSError, os.chdir, support.TESTFN) | 1182 self.assertRaises(OSError, os.chdir, support.TESTFN) |
1097 | 1183 |
(...skipping 26 matching lines...) Expand all Loading... |
1124 | 1210 |
1125 def check(self, f, *args): | 1211 def check(self, f, *args): |
1126 try: | 1212 try: |
1127 f(support.make_bad_fd(), *args) | 1213 f(support.make_bad_fd(), *args) |
1128 except OSError as e: | 1214 except OSError as e: |
1129 self.assertEqual(e.errno, errno.EBADF) | 1215 self.assertEqual(e.errno, errno.EBADF) |
1130 else: | 1216 else: |
1131 self.fail("%r didn't raise a OSError with a bad file descriptor" | 1217 self.fail("%r didn't raise a OSError with a bad file descriptor" |
1132 % f) | 1218 % f) |
1133 | 1219 |
| 1220 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') |
1134 def test_isatty(self): | 1221 def test_isatty(self): |
1135 if hasattr(os, "isatty"): | 1222 self.assertEqual(os.isatty(support.make_bad_fd()), False) |
1136 self.assertEqual(os.isatty(support.make_bad_fd()), False) | 1223 |
1137 | 1224 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()'
) |
1138 def test_closerange(self): | 1225 def test_closerange(self): |
1139 if hasattr(os, "closerange"): | 1226 fd = support.make_bad_fd() |
1140 fd = support.make_bad_fd() | 1227 # Make sure none of the descriptors we are about to close are |
1141 # Make sure none of the descriptors we are about to close are | 1228 # currently valid (issue 6542). |
1142 # currently valid (issue 6542). | 1229 for i in range(10): |
1143 for i in range(10): | 1230 try: os.fstat(fd+i) |
1144 try: os.fstat(fd+i) | 1231 except OSError: |
1145 except OSError: | 1232 pass |
1146 pass | 1233 else: |
1147 else: | 1234 break |
1148 break | 1235 if i < 2: |
1149 if i < 2: | 1236 raise unittest.SkipTest( |
1150 raise unittest.SkipTest( | 1237 "Unable to acquire a range of invalid file descriptors") |
1151 "Unable to acquire a range of invalid file descriptors") | 1238 self.assertEqual(os.closerange(fd, fd + i-1), None) |
1152 self.assertEqual(os.closerange(fd, fd + i-1), None) | 1239 |
1153 | 1240 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') |
1154 def test_dup2(self): | 1241 def test_dup2(self): |
1155 if hasattr(os, "dup2"): | 1242 self.check(os.dup2, 20) |
1156 self.check(os.dup2, 20) | 1243 |
1157 | 1244 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') |
1158 def test_fchmod(self): | 1245 def test_fchmod(self): |
1159 if hasattr(os, "fchmod"): | 1246 self.check(os.fchmod, 0) |
1160 self.check(os.fchmod, 0) | 1247 |
1161 | 1248 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') |
1162 def test_fchown(self): | 1249 def test_fchown(self): |
1163 if hasattr(os, "fchown"): | 1250 self.check(os.fchown, -1, -1) |
1164 self.check(os.fchown, -1, -1) | 1251 |
1165 | 1252 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') |
1166 def test_fpathconf(self): | 1253 def test_fpathconf(self): |
1167 if hasattr(os, "fpathconf"): | 1254 self.check(os.pathconf, "PC_NAME_MAX") |
1168 self.check(os.pathconf, "PC_NAME_MAX") | 1255 self.check(os.fpathconf, "PC_NAME_MAX") |
1169 self.check(os.fpathconf, "PC_NAME_MAX") | 1256 |
1170 | 1257 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') |
1171 def test_ftruncate(self): | 1258 def test_ftruncate(self): |
1172 if hasattr(os, "ftruncate"): | 1259 self.check(os.truncate, 0) |
1173 self.check(os.truncate, 0) | 1260 self.check(os.ftruncate, 0) |
1174 self.check(os.ftruncate, 0) | 1261 |
1175 | 1262 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') |
1176 def test_lseek(self): | 1263 def test_lseek(self): |
1177 if hasattr(os, "lseek"): | 1264 self.check(os.lseek, 0, 0) |
1178 self.check(os.lseek, 0, 0) | 1265 |
1179 | 1266 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') |
1180 def test_read(self): | 1267 def test_read(self): |
1181 if hasattr(os, "read"): | 1268 self.check(os.read, 1) |
1182 self.check(os.read, 1) | 1269 |
1183 | 1270 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') |
| 1271 def test_readv(self): |
| 1272 buf = bytearray(10) |
| 1273 self.check(os.readv, [buf]) |
| 1274 |
| 1275 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') |
1184 def test_tcsetpgrpt(self): | 1276 def test_tcsetpgrpt(self): |
1185 if hasattr(os, "tcsetpgrp"): | 1277 self.check(os.tcsetpgrp, 0) |
1186 self.check(os.tcsetpgrp, 0) | 1278 |
1187 | 1279 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') |
1188 def test_write(self): | 1280 def test_write(self): |
1189 if hasattr(os, "write"): | 1281 self.check(os.write, b" ") |
1190 self.check(os.write, b" ") | 1282 |
| 1283 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') |
| 1284 def test_writev(self): |
| 1285 self.check(os.writev, [b'abc']) |
1191 | 1286 |
1192 | 1287 |
1193 class LinkTests(unittest.TestCase): | 1288 class LinkTests(unittest.TestCase): |
1194 def setUp(self): | 1289 def setUp(self): |
1195 self.file1 = support.TESTFN | 1290 self.file1 = support.TESTFN |
1196 self.file2 = os.path.join(support.TESTFN + "2") | 1291 self.file2 = os.path.join(support.TESTFN + "2") |
1197 | 1292 |
1198 def tearDown(self): | 1293 def tearDown(self): |
1199 for file in (self.file1, self.file2): | 1294 for file in (self.file1, self.file2): |
1200 if os.path.exists(file): | 1295 if os.path.exists(file): |
(...skipping 19 matching lines...) Expand all Loading... |
1220 def test_unicode_name(self): | 1315 def test_unicode_name(self): |
1221 try: | 1316 try: |
1222 os.fsencode("\xf1") | 1317 os.fsencode("\xf1") |
1223 except UnicodeError: | 1318 except UnicodeError: |
1224 raise unittest.SkipTest("Unable to encode for this platform.") | 1319 raise unittest.SkipTest("Unable to encode for this platform.") |
1225 | 1320 |
1226 self.file1 += "\xf1" | 1321 self.file1 += "\xf1" |
1227 self.file2 = self.file1 + "2" | 1322 self.file2 = self.file1 + "2" |
1228 self._test_link(self.file1, self.file2) | 1323 self._test_link(self.file1, self.file2) |
1229 | 1324 |
1230 if sys.platform != 'win32': | 1325 @unittest.skipIf(sys.platform == "win32", "Posix specific tests") |
1231 class Win32ErrorTests(unittest.TestCase): | 1326 class PosixUidGidTests(unittest.TestCase): |
1232 pass | 1327 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') |
1233 | 1328 def test_setuid(self): |
1234 class PosixUidGidTests(unittest.TestCase): | 1329 if os.getuid() != 0: |
1235 if hasattr(os, 'setuid'): | 1330 self.assertRaises(OSError, os.setuid, 0) |
1236 def test_setuid(self): | 1331 self.assertRaises(OverflowError, os.setuid, 1<<32) |
1237 if os.getuid() != 0: | 1332 |
1238 self.assertRaises(OSError, os.setuid, 0) | 1333 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') |
1239 self.assertRaises(OverflowError, os.setuid, 1<<32) | 1334 def test_setgid(self): |
1240 | 1335 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: |
1241 if hasattr(os, 'setgid'): | 1336 self.assertRaises(OSError, os.setgid, 0) |
1242 def test_setgid(self): | 1337 self.assertRaises(OverflowError, os.setgid, 1<<32) |
1243 if os.getuid() != 0: | 1338 |
1244 self.assertRaises(OSError, os.setgid, 0) | 1339 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') |
1245 self.assertRaises(OverflowError, os.setgid, 1<<32) | 1340 def test_seteuid(self): |
1246 | 1341 if os.getuid() != 0: |
1247 if hasattr(os, 'seteuid'): | 1342 self.assertRaises(OSError, os.seteuid, 0) |
1248 def test_seteuid(self): | 1343 self.assertRaises(OverflowError, os.seteuid, 1<<32) |
1249 if os.getuid() != 0: | 1344 |
1250 self.assertRaises(OSError, os.seteuid, 0) | 1345 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') |
1251 self.assertRaises(OverflowError, os.seteuid, 1<<32) | 1346 def test_setegid(self): |
1252 | 1347 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: |
1253 if hasattr(os, 'setegid'): | 1348 self.assertRaises(OSError, os.setegid, 0) |
1254 def test_setegid(self): | 1349 self.assertRaises(OverflowError, os.setegid, 1<<32) |
1255 if os.getuid() != 0: | 1350 |
1256 self.assertRaises(OSError, os.setegid, 0) | 1351 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') |
1257 self.assertRaises(OverflowError, os.setegid, 1<<32) | 1352 def test_setreuid(self): |
1258 | 1353 if os.getuid() != 0: |
1259 if hasattr(os, 'setreuid'): | 1354 self.assertRaises(OSError, os.setreuid, 0, 0) |
1260 def test_setreuid(self): | 1355 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) |
1261 if os.getuid() != 0: | 1356 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) |
1262 self.assertRaises(OSError, os.setreuid, 0, 0) | 1357 |
1263 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) | 1358 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') |
1264 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) | 1359 def test_setreuid_neg1(self): |
1265 | 1360 # Needs to accept -1. We run this in a subprocess to avoid |
1266 def test_setreuid_neg1(self): | 1361 # altering the test runner's process state (issue8045). |
1267 # Needs to accept -1. We run this in a subprocess to avoid | 1362 subprocess.check_call([ |
1268 # altering the test runner's process state (issue8045). | 1363 sys.executable, '-c', |
1269 subprocess.check_call([ | 1364 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) |
1270 sys.executable, '-c', | 1365 |
1271 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) | 1366 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') |
1272 | 1367 def test_setregid(self): |
1273 if hasattr(os, 'setregid'): | 1368 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: |
1274 def test_setregid(self): | 1369 self.assertRaises(OSError, os.setregid, 0, 0) |
1275 if os.getuid() != 0: | 1370 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) |
1276 self.assertRaises(OSError, os.setregid, 0, 0) | 1371 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) |
1277 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) | 1372 |
1278 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) | 1373 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') |
1279 | 1374 def test_setregid_neg1(self): |
1280 def test_setregid_neg1(self): | 1375 # Needs to accept -1. We run this in a subprocess to avoid |
1281 # Needs to accept -1. We run this in a subprocess to avoid | 1376 # altering the test runner's process state (issue8045). |
1282 # altering the test runner's process state (issue8045). | 1377 subprocess.check_call([ |
1283 subprocess.check_call([ | 1378 sys.executable, '-c', |
1284 sys.executable, '-c', | 1379 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) |
1285 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) | 1380 |
1286 | 1381 @unittest.skipIf(sys.platform == "win32", "Posix specific tests") |
1287 class Pep383Tests(unittest.TestCase): | 1382 class Pep383Tests(unittest.TestCase): |
1288 def setUp(self): | 1383 def setUp(self): |
1289 if support.TESTFN_UNENCODABLE: | 1384 if support.TESTFN_UNENCODABLE: |
1290 self.dir = support.TESTFN_UNENCODABLE | 1385 self.dir = support.TESTFN_UNENCODABLE |
1291 elif support.TESTFN_NONASCII: | 1386 elif support.TESTFN_NONASCII: |
1292 self.dir = support.TESTFN_NONASCII | 1387 self.dir = support.TESTFN_NONASCII |
1293 else: | 1388 else: |
1294 self.dir = support.TESTFN | 1389 self.dir = support.TESTFN |
1295 self.bdir = os.fsencode(self.dir) | 1390 self.bdir = os.fsencode(self.dir) |
1296 | 1391 |
1297 bytesfn = [] | 1392 bytesfn = [] |
1298 def add_filename(fn): | 1393 def add_filename(fn): |
1299 try: | |
1300 fn = os.fsencode(fn) | |
1301 except UnicodeEncodeError: | |
1302 return | |
1303 bytesfn.append(fn) | |
1304 add_filename(support.TESTFN_UNICODE) | |
1305 if support.TESTFN_UNENCODABLE: | |
1306 add_filename(support.TESTFN_UNENCODABLE) | |
1307 if support.TESTFN_NONASCII: | |
1308 add_filename(support.TESTFN_NONASCII) | |
1309 if not bytesfn: | |
1310 self.skipTest("couldn't create any non-ascii filename") | |
1311 | |
1312 self.unicodefn = set() | |
1313 os.mkdir(self.dir) | |
1314 try: | 1394 try: |
1315 for fn in bytesfn: | 1395 fn = os.fsencode(fn) |
1316 support.create_empty_file(os.path.join(self.bdir, fn)) | 1396 except UnicodeEncodeError: |
1317 fn = os.fsdecode(fn) | 1397 return |
1318 if fn in self.unicodefn: | 1398 bytesfn.append(fn) |
1319 raise ValueError("duplicate filename") | 1399 add_filename(support.TESTFN_UNICODE) |
1320 self.unicodefn.add(fn) | 1400 if support.TESTFN_UNENCODABLE: |
1321 except: | 1401 add_filename(support.TESTFN_UNENCODABLE) |
1322 shutil.rmtree(self.dir) | 1402 if support.TESTFN_NONASCII: |
1323 raise | 1403 add_filename(support.TESTFN_NONASCII) |
1324 | 1404 if not bytesfn: |
1325 def tearDown(self): | 1405 self.skipTest("couldn't create any non-ascii filename") |
| 1406 |
| 1407 self.unicodefn = set() |
| 1408 os.mkdir(self.dir) |
| 1409 try: |
| 1410 for fn in bytesfn: |
| 1411 support.create_empty_file(os.path.join(self.bdir, fn)) |
| 1412 fn = os.fsdecode(fn) |
| 1413 if fn in self.unicodefn: |
| 1414 raise ValueError("duplicate filename") |
| 1415 self.unicodefn.add(fn) |
| 1416 except: |
1326 shutil.rmtree(self.dir) | 1417 shutil.rmtree(self.dir) |
1327 | 1418 raise |
1328 def test_listdir(self): | 1419 |
1329 expected = self.unicodefn | 1420 def tearDown(self): |
1330 found = set(os.listdir(self.dir)) | 1421 shutil.rmtree(self.dir) |
1331 self.assertEqual(found, expected) | 1422 |
1332 # test listdir without arguments | 1423 def test_listdir(self): |
1333 current_directory = os.getcwd() | 1424 expected = self.unicodefn |
1334 try: | 1425 found = set(os.listdir(self.dir)) |
1335 os.chdir(os.sep) | 1426 self.assertEqual(found, expected) |
1336 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) | 1427 # test listdir without arguments |
1337 finally: | 1428 current_directory = os.getcwd() |
1338 os.chdir(current_directory) | 1429 try: |
1339 | 1430 os.chdir(os.sep) |
1340 def test_open(self): | 1431 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) |
1341 for fn in self.unicodefn: | 1432 finally: |
1342 f = open(os.path.join(self.dir, fn), 'rb') | 1433 os.chdir(current_directory) |
1343 f.close() | 1434 |
1344 | 1435 def test_open(self): |
1345 @unittest.skipUnless(hasattr(os, 'statvfs'), | 1436 for fn in self.unicodefn: |
1346 "need os.statvfs()") | 1437 f = open(os.path.join(self.dir, fn), 'rb') |
1347 def test_statvfs(self): | 1438 f.close() |
1348 # issue #9645 | 1439 |
1349 for fn in self.unicodefn: | 1440 @unittest.skipUnless(hasattr(os, 'statvfs'), |
1350 # should not fail with file not found error | 1441 "need os.statvfs()") |
1351 fullname = os.path.join(self.dir, fn) | 1442 def test_statvfs(self): |
1352 os.statvfs(fullname) | 1443 # issue #9645 |
1353 | 1444 for fn in self.unicodefn: |
1354 def test_stat(self): | 1445 # should not fail with file not found error |
1355 for fn in self.unicodefn: | 1446 fullname = os.path.join(self.dir, fn) |
1356 os.stat(os.path.join(self.dir, fn)) | 1447 os.statvfs(fullname) |
1357 else: | 1448 |
1358 class PosixUidGidTests(unittest.TestCase): | 1449 def test_stat(self): |
1359 pass | 1450 for fn in self.unicodefn: |
1360 class Pep383Tests(unittest.TestCase): | 1451 os.stat(os.path.join(self.dir, fn)) |
1361 pass | |
1362 | 1452 |
1363 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") | 1453 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
1364 class Win32KillTests(unittest.TestCase): | 1454 class Win32KillTests(unittest.TestCase): |
1365 def _kill(self, sig): | 1455 def _kill(self, sig): |
1366 # Start sys.executable as a subprocess and communicate from the | 1456 # Start sys.executable as a subprocess and communicate from the |
1367 # subprocess to the parent that the interpreter is ready. When it | 1457 # subprocess to the parent that the interpreter is ready. When it |
1368 # becomes ready, send *sig* via os.kill to the subprocess and check | 1458 # becomes ready, send *sig* via os.kill to the subprocess and check |
1369 # that the return code is equal to *sig*. | 1459 # that the return code is equal to *sig*. |
1370 import ctypes | 1460 import ctypes |
1371 from ctypes import wintypes | 1461 from ctypes import wintypes |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1470 # by subprocesses. | 1560 # by subprocesses. |
1471 SetConsoleCtrlHandler(NULL, 0) | 1561 SetConsoleCtrlHandler(NULL, 0) |
1472 | 1562 |
1473 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") | 1563 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") |
1474 | 1564 |
1475 def test_CTRL_BREAK_EVENT(self): | 1565 def test_CTRL_BREAK_EVENT(self): |
1476 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") | 1566 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") |
1477 | 1567 |
1478 | 1568 |
1479 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") | 1569 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
| 1570 class Win32ListdirTests(unittest.TestCase): |
| 1571 """Test listdir on Windows.""" |
| 1572 |
| 1573 def setUp(self): |
| 1574 self.created_paths = [] |
| 1575 for i in range(2): |
| 1576 dir_name = 'SUB%d' % i |
| 1577 dir_path = os.path.join(support.TESTFN, dir_name) |
| 1578 file_name = 'FILE%d' % i |
| 1579 file_path = os.path.join(support.TESTFN, file_name) |
| 1580 os.makedirs(dir_path) |
| 1581 with open(file_path, 'w') as f: |
| 1582 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) |
| 1583 self.created_paths.extend([dir_name, file_name]) |
| 1584 self.created_paths.sort() |
| 1585 |
| 1586 def tearDown(self): |
| 1587 shutil.rmtree(support.TESTFN) |
| 1588 |
| 1589 def test_listdir_no_extended_path(self): |
| 1590 """Test when the path is not an "extended" path.""" |
| 1591 # unicode |
| 1592 self.assertEqual( |
| 1593 sorted(os.listdir(support.TESTFN)), |
| 1594 self.created_paths) |
| 1595 # bytes |
| 1596 self.assertEqual( |
| 1597 sorted(os.listdir(os.fsencode(support.TESTFN))), |
| 1598 [os.fsencode(path) for path in self.created_paths]) |
| 1599 |
| 1600 def test_listdir_extended_path(self): |
| 1601 """Test when the path starts with '\\\\?\\'.""" |
| 1602 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(
v=vs.85).aspx#maxpath |
| 1603 # unicode |
| 1604 path = '\\\\?\\' + os.path.abspath(support.TESTFN) |
| 1605 self.assertEqual( |
| 1606 sorted(os.listdir(path)), |
| 1607 self.created_paths) |
| 1608 # bytes |
| 1609 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) |
| 1610 self.assertEqual( |
| 1611 sorted(os.listdir(path)), |
| 1612 [os.fsencode(path) for path in self.created_paths]) |
| 1613 |
| 1614 |
| 1615 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") |
1480 @support.skip_unless_symlink | 1616 @support.skip_unless_symlink |
1481 class Win32SymlinkTests(unittest.TestCase): | 1617 class Win32SymlinkTests(unittest.TestCase): |
1482 filelink = 'filelinktest' | 1618 filelink = 'filelinktest' |
1483 filelink_target = os.path.abspath(__file__) | 1619 filelink_target = os.path.abspath(__file__) |
1484 dirlink = 'dirlinktest' | 1620 dirlink = 'dirlinktest' |
1485 dirlink_target = os.path.dirname(filelink_target) | 1621 dirlink_target = os.path.dirname(filelink_target) |
1486 missing_link = 'missing link' | 1622 missing_link = 'missing link' |
1487 | 1623 |
1488 def setUp(self): | 1624 def setUp(self): |
1489 assert os.path.exists(self.dirlink_target) | 1625 assert os.path.exists(self.dirlink_target) |
1490 assert os.path.exists(self.filelink_target) | 1626 assert os.path.exists(self.filelink_target) |
1491 assert not os.path.exists(self.dirlink) | 1627 assert not os.path.exists(self.dirlink) |
1492 assert not os.path.exists(self.filelink) | 1628 assert not os.path.exists(self.filelink) |
1493 assert not os.path.exists(self.missing_link) | 1629 assert not os.path.exists(self.missing_link) |
1494 | 1630 |
1495 def tearDown(self): | 1631 def tearDown(self): |
1496 if os.path.exists(self.filelink): | 1632 if os.path.exists(self.filelink): |
1497 os.remove(self.filelink) | 1633 os.remove(self.filelink) |
1498 if os.path.exists(self.dirlink): | 1634 if os.path.exists(self.dirlink): |
1499 os.rmdir(self.dirlink) | 1635 os.rmdir(self.dirlink) |
1500 if os.path.lexists(self.missing_link): | 1636 if os.path.lexists(self.missing_link): |
1501 os.remove(self.missing_link) | 1637 os.remove(self.missing_link) |
1502 | 1638 |
1503 def test_directory_link(self): | 1639 def test_directory_link(self): |
1504 os.symlink(self.dirlink_target, self.dirlink, True) | 1640 os.symlink(self.dirlink_target, self.dirlink) |
1505 self.assertTrue(os.path.exists(self.dirlink)) | 1641 self.assertTrue(os.path.exists(self.dirlink)) |
1506 self.assertTrue(os.path.isdir(self.dirlink)) | 1642 self.assertTrue(os.path.isdir(self.dirlink)) |
1507 self.assertTrue(os.path.islink(self.dirlink)) | 1643 self.assertTrue(os.path.islink(self.dirlink)) |
1508 self.check_stat(self.dirlink, self.dirlink_target) | 1644 self.check_stat(self.dirlink, self.dirlink_target) |
1509 | 1645 |
1510 def test_file_link(self): | 1646 def test_file_link(self): |
1511 os.symlink(self.filelink_target, self.filelink) | 1647 os.symlink(self.filelink_target, self.filelink) |
1512 self.assertTrue(os.path.exists(self.filelink)) | 1648 self.assertTrue(os.path.exists(self.filelink)) |
1513 self.assertTrue(os.path.isfile(self.filelink)) | 1649 self.assertTrue(os.path.isfile(self.filelink)) |
1514 self.assertTrue(os.path.islink(self.filelink)) | 1650 self.assertTrue(os.path.islink(self.filelink)) |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1586 os.chdir(level3) | 1722 os.chdir(level3) |
1587 self.assertEqual(os.stat(file1), | 1723 self.assertEqual(os.stat(file1), |
1588 os.stat(os.path.relpath(link))) | 1724 os.stat(os.path.relpath(link))) |
1589 finally: | 1725 finally: |
1590 os.chdir(orig_dir) | 1726 os.chdir(orig_dir) |
1591 except OSError as err: | 1727 except OSError as err: |
1592 self.fail(err) | 1728 self.fail(err) |
1593 finally: | 1729 finally: |
1594 os.remove(file1) | 1730 os.remove(file1) |
1595 shutil.rmtree(level1) | 1731 shutil.rmtree(level1) |
| 1732 |
| 1733 |
| 1734 @support.skip_unless_symlink |
| 1735 class NonLocalSymlinkTests(unittest.TestCase): |
| 1736 |
| 1737 def setUp(self): |
| 1738 """ |
| 1739 Create this structure: |
| 1740 |
| 1741 base |
| 1742 \___ some_dir |
| 1743 """ |
| 1744 os.makedirs('base/some_dir') |
| 1745 |
| 1746 def tearDown(self): |
| 1747 shutil.rmtree('base') |
| 1748 |
| 1749 def test_directory_link_nonlocal(self): |
| 1750 """ |
| 1751 The symlink target should resolve relative to the link, not relative |
| 1752 to the current directory. |
| 1753 |
| 1754 Then, link base/some_link -> base/some_dir and ensure that some_link |
| 1755 is resolved as a directory. |
| 1756 |
| 1757 In issue13772, it was discovered that directory detection failed if |
| 1758 the symlink target was not specified relative to the current |
| 1759 directory, which was a defect in the implementation. |
| 1760 """ |
| 1761 src = os.path.join('base', 'some_link') |
| 1762 os.symlink('some_dir', src) |
| 1763 assert os.path.isdir(src) |
1596 | 1764 |
1597 | 1765 |
1598 class FSEncodingTests(unittest.TestCase): | 1766 class FSEncodingTests(unittest.TestCase): |
1599 def test_nop(self): | 1767 def test_nop(self): |
1600 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') | 1768 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') |
1601 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') | 1769 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') |
1602 | 1770 |
1603 def test_identity(self): | 1771 def test_identity(self): |
1604 # assert fsdecode(fsencode(x)) == x | 1772 # assert fsdecode(fsencode(x)) == x |
1605 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): | 1773 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1757 | 1925 |
1758 | 1926 |
1759 @unittest.skipUnless(threading is not None, "test needs threading module") | 1927 @unittest.skipUnless(threading is not None, "test needs threading module") |
1760 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") | 1928 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") |
1761 class TestSendfile(unittest.TestCase): | 1929 class TestSendfile(unittest.TestCase): |
1762 | 1930 |
1763 DATA = b"12345abcde" * 16 * 1024 # 160 KB | 1931 DATA = b"12345abcde" * 16 * 1024 # 160 KB |
1764 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ | 1932 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ |
1765 not sys.platform.startswith("solaris") and \ | 1933 not sys.platform.startswith("solaris") and \ |
1766 not sys.platform.startswith("sunos") | 1934 not sys.platform.startswith("sunos") |
| 1935 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, |
| 1936 'requires headers and trailers support') |
1767 | 1937 |
1768 @classmethod | 1938 @classmethod |
1769 def setUpClass(cls): | 1939 def setUpClass(cls): |
1770 with open(support.TESTFN, "wb") as f: | 1940 with open(support.TESTFN, "wb") as f: |
1771 f.write(cls.DATA) | 1941 f.write(cls.DATA) |
1772 | 1942 |
1773 @classmethod | 1943 @classmethod |
1774 def tearDownClass(cls): | 1944 def tearDownClass(cls): |
1775 support.unlink(support.TESTFN) | 1945 support.unlink(support.TESTFN) |
1776 | 1946 |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1875 data = self.server.handler_instance.get_data() | 2045 data = self.server.handler_instance.get_data() |
1876 self.assertEqual(data, b'') | 2046 self.assertEqual(data, b'') |
1877 | 2047 |
1878 def test_invalid_offset(self): | 2048 def test_invalid_offset(self): |
1879 with self.assertRaises(OSError) as cm: | 2049 with self.assertRaises(OSError) as cm: |
1880 os.sendfile(self.sockno, self.fileno, -1, 4096) | 2050 os.sendfile(self.sockno, self.fileno, -1, 4096) |
1881 self.assertEqual(cm.exception.errno, errno.EINVAL) | 2051 self.assertEqual(cm.exception.errno, errno.EINVAL) |
1882 | 2052 |
1883 # --- headers / trailers tests | 2053 # --- headers / trailers tests |
1884 | 2054 |
1885 if SUPPORT_HEADERS_TRAILERS: | 2055 @requires_headers_trailers |
1886 | 2056 def test_headers(self): |
1887 def test_headers(self): | 2057 total_sent = 0 |
1888 total_sent = 0 | 2058 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, |
1889 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, | 2059 headers=[b"x" * 512]) |
1890 headers=[b"x" * 512]) | 2060 total_sent += sent |
| 2061 offset = 4096 |
| 2062 nbytes = 4096 |
| 2063 while 1: |
| 2064 sent = self.sendfile_wrapper(self.sockno, self.fileno, |
| 2065 offset, nbytes) |
| 2066 if sent == 0: |
| 2067 break |
1891 total_sent += sent | 2068 total_sent += sent |
1892 offset = 4096 | 2069 offset += sent |
1893 nbytes = 4096 | 2070 |
1894 while 1: | 2071 expected_data = b"x" * 512 + self.DATA |
1895 sent = self.sendfile_wrapper(self.sockno, self.fileno, | 2072 self.assertEqual(total_sent, len(expected_data)) |
1896 offset, nbytes) | 2073 self.client.close() |
1897 if sent == 0: | 2074 self.server.wait() |
1898 break | 2075 data = self.server.handler_instance.get_data() |
1899 total_sent += sent | 2076 self.assertEqual(hash(data), hash(expected_data)) |
1900 offset += sent | 2077 |
1901 | 2078 @requires_headers_trailers |
1902 expected_data = b"x" * 512 + self.DATA | 2079 def test_trailers(self): |
1903 self.assertEqual(total_sent, len(expected_data)) | 2080 TESTFN2 = support.TESTFN + "2" |
| 2081 file_data = b"abcdef" |
| 2082 with open(TESTFN2, 'wb') as f: |
| 2083 f.write(file_data) |
| 2084 with open(TESTFN2, 'rb')as f: |
| 2085 self.addCleanup(os.remove, TESTFN2) |
| 2086 os.sendfile(self.sockno, f.fileno(), 0, len(file_data), |
| 2087 trailers=[b"1234"]) |
1904 self.client.close() | 2088 self.client.close() |
1905 self.server.wait() | 2089 self.server.wait() |
1906 data = self.server.handler_instance.get_data() | 2090 data = self.server.handler_instance.get_data() |
1907 self.assertEqual(hash(data), hash(expected_data)) | 2091 self.assertEqual(data, b"abcdef1234") |
1908 | 2092 |
1909 def test_trailers(self): | 2093 @requires_headers_trailers |
1910 TESTFN2 = support.TESTFN + "2" | 2094 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), |
1911 with open(TESTFN2, 'wb') as f: | 2095 'test needs os.SF_NODISKIO') |
1912 f.write(b"abcde") | 2096 def test_flags(self): |
1913 with open(TESTFN2, 'rb')as f: | 2097 try: |
1914 self.addCleanup(os.remove, TESTFN2) | 2098 os.sendfile(self.sockno, self.fileno, 0, 4096, |
1915 os.sendfile(self.sockno, f.fileno(), 0, 4096, | 2099 flags=os.SF_NODISKIO) |
1916 trailers=[b"12345"]) | 2100 except OSError as err: |
1917 self.client.close() | 2101 if err.errno not in (errno.EBUSY, errno.EAGAIN): |
1918 self.server.wait() | 2102 raise |
1919 data = self.server.handler_instance.get_data() | |
1920 self.assertEqual(data, b"abcde12345") | |
1921 | |
1922 if hasattr(os, "SF_NODISKIO"): | |
1923 def test_flags(self): | |
1924 try: | |
1925 os.sendfile(self.sockno, self.fileno, 0, 4096, | |
1926 flags=os.SF_NODISKIO) | |
1927 except OSError as err: | |
1928 if err.errno not in (errno.EBUSY, errno.EAGAIN): | |
1929 raise | |
1930 | 2103 |
1931 | 2104 |
1932 def supports_extended_attributes(): | 2105 def supports_extended_attributes(): |
1933 if not hasattr(os, "setxattr"): | 2106 if not hasattr(os, "setxattr"): |
1934 return False | 2107 return False |
1935 try: | 2108 try: |
1936 with open(support.TESTFN, "wb") as fp: | 2109 with open(support.TESTFN, "wb") as fp: |
1937 try: | 2110 try: |
1938 os.setxattr(fp.fileno(), b"user.test", b"") | 2111 os.setxattr(fp.fileno(), b"user.test", b"") |
1939 except OSError: | 2112 except OSError: |
(...skipping 254 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2194 | 2367 |
2195 for filenames, func, *func_args in funcs: | 2368 for filenames, func, *func_args in funcs: |
2196 for name in filenames: | 2369 for name in filenames: |
2197 try: | 2370 try: |
2198 func(name, *func_args) | 2371 func(name, *func_args) |
2199 except OSError as err: | 2372 except OSError as err: |
2200 self.assertIs(err.filename, name) | 2373 self.assertIs(err.filename, name) |
2201 else: | 2374 else: |
2202 self.fail("No exception thrown by {}".format(func)) | 2375 self.fail("No exception thrown by {}".format(func)) |
2203 | 2376 |
| 2377 class CPUCountTests(unittest.TestCase): |
| 2378 def test_cpu_count(self): |
| 2379 cpus = os.cpu_count() |
| 2380 if cpus is not None: |
| 2381 self.assertIsInstance(cpus, int) |
| 2382 self.assertGreater(cpus, 0) |
| 2383 else: |
| 2384 self.skipTest("Could not determine the number of CPUs") |
| 2385 |
| 2386 |
| 2387 class FDInheritanceTests(unittest.TestCase): |
| 2388 def test_get_set_inheritable(self): |
| 2389 fd = os.open(__file__, os.O_RDONLY) |
| 2390 self.addCleanup(os.close, fd) |
| 2391 self.assertEqual(os.get_inheritable(fd), False) |
| 2392 |
| 2393 os.set_inheritable(fd, True) |
| 2394 self.assertEqual(os.get_inheritable(fd), True) |
| 2395 |
| 2396 @unittest.skipIf(fcntl is None, "need fcntl") |
| 2397 def test_get_inheritable_cloexec(self): |
| 2398 fd = os.open(__file__, os.O_RDONLY) |
| 2399 self.addCleanup(os.close, fd) |
| 2400 self.assertEqual(os.get_inheritable(fd), False) |
| 2401 |
| 2402 # clear FD_CLOEXEC flag |
| 2403 flags = fcntl.fcntl(fd, fcntl.F_GETFD) |
| 2404 flags &= ~fcntl.FD_CLOEXEC |
| 2405 fcntl.fcntl(fd, fcntl.F_SETFD, flags) |
| 2406 |
| 2407 self.assertEqual(os.get_inheritable(fd), True) |
| 2408 |
| 2409 @unittest.skipIf(fcntl is None, "need fcntl") |
| 2410 def test_set_inheritable_cloexec(self): |
| 2411 fd = os.open(__file__, os.O_RDONLY) |
| 2412 self.addCleanup(os.close, fd) |
| 2413 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, |
| 2414 fcntl.FD_CLOEXEC) |
| 2415 |
| 2416 os.set_inheritable(fd, True) |
| 2417 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, |
| 2418 0) |
| 2419 |
| 2420 def test_open(self): |
| 2421 fd = os.open(__file__, os.O_RDONLY) |
| 2422 self.addCleanup(os.close, fd) |
| 2423 self.assertEqual(os.get_inheritable(fd), False) |
| 2424 |
| 2425 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") |
| 2426 def test_pipe(self): |
| 2427 rfd, wfd = os.pipe() |
| 2428 self.addCleanup(os.close, rfd) |
| 2429 self.addCleanup(os.close, wfd) |
| 2430 self.assertEqual(os.get_inheritable(rfd), False) |
| 2431 self.assertEqual(os.get_inheritable(wfd), False) |
| 2432 |
| 2433 def test_dup(self): |
| 2434 fd1 = os.open(__file__, os.O_RDONLY) |
| 2435 self.addCleanup(os.close, fd1) |
| 2436 |
| 2437 fd2 = os.dup(fd1) |
| 2438 self.addCleanup(os.close, fd2) |
| 2439 self.assertEqual(os.get_inheritable(fd2), False) |
| 2440 |
| 2441 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") |
| 2442 def test_dup2(self): |
| 2443 fd = os.open(__file__, os.O_RDONLY) |
| 2444 self.addCleanup(os.close, fd) |
| 2445 |
| 2446 # inheritable by default |
| 2447 fd2 = os.open(__file__, os.O_RDONLY) |
| 2448 try: |
| 2449 os.dup2(fd, fd2) |
| 2450 self.assertEqual(os.get_inheritable(fd2), True) |
| 2451 finally: |
| 2452 os.close(fd2) |
| 2453 |
| 2454 # force non-inheritable |
| 2455 fd3 = os.open(__file__, os.O_RDONLY) |
| 2456 try: |
| 2457 os.dup2(fd, fd3, inheritable=False) |
| 2458 self.assertEqual(os.get_inheritable(fd3), False) |
| 2459 finally: |
| 2460 os.close(fd3) |
| 2461 |
| 2462 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") |
| 2463 def test_openpty(self): |
| 2464 master_fd, slave_fd = os.openpty() |
| 2465 self.addCleanup(os.close, master_fd) |
| 2466 self.addCleanup(os.close, slave_fd) |
| 2467 self.assertEqual(os.get_inheritable(master_fd), False) |
| 2468 self.assertEqual(os.get_inheritable(slave_fd), False) |
| 2469 |
| 2470 |
2204 @support.reap_threads | 2471 @support.reap_threads |
2205 def test_main(): | 2472 def test_main(): |
2206 support.run_unittest( | 2473 support.run_unittest( |
2207 FileTests, | 2474 FileTests, |
2208 StatAttributeTests, | 2475 StatAttributeTests, |
2209 EnvironTests, | 2476 EnvironTests, |
2210 WalkTests, | 2477 WalkTests, |
2211 FwalkTests, | 2478 FwalkTests, |
2212 MakedirTests, | 2479 MakedirTests, |
2213 DevNullTests, | 2480 DevNullTests, |
2214 URandomTests, | 2481 URandomTests, |
2215 ExecTests, | 2482 ExecTests, |
2216 Win32ErrorTests, | 2483 Win32ErrorTests, |
2217 TestInvalidFD, | 2484 TestInvalidFD, |
2218 PosixUidGidTests, | 2485 PosixUidGidTests, |
2219 Pep383Tests, | 2486 Pep383Tests, |
2220 Win32KillTests, | 2487 Win32KillTests, |
| 2488 Win32ListdirTests, |
2221 Win32SymlinkTests, | 2489 Win32SymlinkTests, |
| 2490 NonLocalSymlinkTests, |
2222 FSEncodingTests, | 2491 FSEncodingTests, |
2223 DeviceEncodingTests, | 2492 DeviceEncodingTests, |
2224 PidTests, | 2493 PidTests, |
2225 LoginTests, | 2494 LoginTests, |
2226 LinkTests, | 2495 LinkTests, |
2227 TestSendfile, | 2496 TestSendfile, |
2228 ProgramPriorityTests, | 2497 ProgramPriorityTests, |
2229 ExtendedAttributeTests, | 2498 ExtendedAttributeTests, |
2230 Win32DeprecatedBytesAPI, | 2499 Win32DeprecatedBytesAPI, |
2231 TermsizeTests, | 2500 TermsizeTests, |
2232 OSErrorTests, | 2501 OSErrorTests, |
2233 RemoveDirsTests, | 2502 RemoveDirsTests, |
| 2503 CPUCountTests, |
| 2504 FDInheritanceTests, |
2234 ) | 2505 ) |
2235 | 2506 |
2236 if __name__ == "__main__": | 2507 if __name__ == "__main__": |
2237 test_main() | 2508 test_main() |
LEFT | RIGHT |