Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(151284)

Delta Between Two Patch Sets: Lib/test/test_os.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 6 years, 3 months ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/_test_multiprocessing.py ('k') | Lib/test/test_ossaudiodev.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # As a test suite for the os module, this is woefully inadequate, but this 1 # As a test suite for the os module, this is woefully inadequate, but this
2 # does add tests for a few functions which have been determined to be more 2 # does add tests for a few functions which have been determined to be more
3 # portable than they had been thought to be. 3 # portable than they had been thought to be.
4 4
5 import os 5 import os
6 import errno 6 import errno
7 import unittest 7 import unittest
8 import warnings 8 import warnings
9 import sys 9 import sys
10 import signal 10 import signal
11 import subprocess 11 import subprocess
12 import time 12 import time
13 import shutil 13 import shutil
14 from test import support 14 from test import support
15 import contextlib 15 import contextlib
16 import mmap 16 import mmap
17 import platform 17 import platform
18 import re 18 import re
19 import uuid 19 import uuid
20 import asyncore 20 import asyncore
21 import asynchat 21 import asynchat
22 import socket 22 import socket
23 import itertools 23 import itertools
24 import stat 24 import stat
25 import locale 25 import locale
26 import codecs 26 import codecs
27 import decimal
28 import fractions
29 import pickle
27 try: 30 try:
28 import threading 31 import threading
29 except ImportError: 32 except ImportError:
30 threading = None 33 threading = None
34 try:
35 import resource
36 except ImportError:
37 resource = None
38 try:
39 import fcntl
40 except ImportError:
41 fcntl = None
42
31 from test.script_helper import assert_python_ok 43 from test.script_helper import assert_python_ok
32 44
33 with warnings.catch_warnings(): 45 with warnings.catch_warnings():
34 warnings.simplefilter("ignore", DeprecationWarning) 46 warnings.simplefilter("ignore", DeprecationWarning)
35 os.stat_float_times(True) 47 os.stat_float_times(True)
36 st = os.stat(__file__) 48 st = os.stat(__file__)
37 stat_supports_subsecond = ( 49 stat_supports_subsecond = (
38 # check if float and int timestamps are different 50 # check if float and int timestamps are different
39 (st.st_atime != st[7]) 51 (st.st_atime != st[7])
40 or (st.st_mtime != st[8]) 52 or (st.st_mtime != st[8])
41 or (st.st_ctime != st[9])) 53 or (st.st_ctime != st[9]))
42 54
43 # Detect whether we're on a Linux system that uses the (now outdated 55 # Detect whether we're on a Linux system that uses the (now outdated
44 # and unmaintained) linuxthreads threading library. There's an issue 56 # and unmaintained) linuxthreads threading library. There's an issue
45 # when combining linuxthreads with a failed execv call: see 57 # when combining linuxthreads with a failed execv call: see
46 # http://bugs.python.org/issue4970. 58 # http://bugs.python.org/issue4970.
47 if hasattr(sys, 'thread_info') and sys.thread_info.version: 59 if hasattr(sys, 'thread_info') and sys.thread_info.version:
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
49 else: 61 else:
50 USING_LINUXTHREADS = False 62 USING_LINUXTHREADS = False
63
64 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
65 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
51 66
52 # Tests creating TESTFN 67 # Tests creating TESTFN
53 class FileTests(unittest.TestCase): 68 class FileTests(unittest.TestCase):
54 def setUp(self): 69 def setUp(self):
55 if os.path.exists(support.TESTFN): 70 if os.path.exists(support.TESTFN):
56 os.unlink(support.TESTFN) 71 os.unlink(support.TESTFN)
57 tearDown = setUp 72 tearDown = setUp
58 73
59 def test_access(self): 74 def test_access(self):
60 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 75 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 os.mkdir(support.TESTFN) 178 os.mkdir(support.TESTFN)
164 self.fname = os.path.join(support.TESTFN, "f1") 179 self.fname = os.path.join(support.TESTFN, "f1")
165 f = open(self.fname, 'wb') 180 f = open(self.fname, 'wb')
166 f.write(b"ABC") 181 f.write(b"ABC")
167 f.close() 182 f.close()
168 183
169 def tearDown(self): 184 def tearDown(self):
170 os.unlink(self.fname) 185 os.unlink(self.fname)
171 os.rmdir(support.TESTFN) 186 os.rmdir(support.TESTFN)
172 187
188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
173 def check_stat_attributes(self, fname): 189 def check_stat_attributes(self, fname):
174 if not hasattr(os, "stat"):
175 return
176
177 result = os.stat(fname) 190 result = os.stat(fname)
178 191
179 # Make sure direct access works 192 # Make sure direct access works
180 self.assertEqual(result[stat.ST_SIZE], 3) 193 self.assertEqual(result[stat.ST_SIZE], 3)
181 self.assertEqual(result.st_size, 3) 194 self.assertEqual(result.st_size, 3)
182 195
183 # Make sure all the attributes are there 196 # Make sure all the attributes are there
184 members = dir(result) 197 members = dir(result)
185 for name in dir(stat): 198 for name in dir(stat):
186 if name[:3] == 'ST_': 199 if name[:3] == 'ST_':
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
243 256
244 def test_stat_attributes_bytes(self): 257 def test_stat_attributes_bytes(self):
245 try: 258 try:
246 fname = self.fname.encode(sys.getfilesystemencoding()) 259 fname = self.fname.encode(sys.getfilesystemencoding())
247 except UnicodeEncodeError: 260 except UnicodeEncodeError:
248 self.skipTest("cannot encode %a for the filesystem" % self.fname) 261 self.skipTest("cannot encode %a for the filesystem" % self.fname)
249 with warnings.catch_warnings(): 262 with warnings.catch_warnings():
250 warnings.simplefilter("ignore", DeprecationWarning) 263 warnings.simplefilter("ignore", DeprecationWarning)
251 self.check_stat_attributes(fname) 264 self.check_stat_attributes(fname)
252 265
266 def test_stat_result_pickle(self):
267 result = os.stat(self.fname)
268 p = pickle.dumps(result)
269 self.assertIn(b'\x03cos\nstat_result\n', p)
270 unpickled = pickle.loads(p)
271 self.assertEqual(result, unpickled)
272
273 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
253 def test_statvfs_attributes(self): 274 def test_statvfs_attributes(self):
254 if not hasattr(os, "statvfs"):
255 return
256
257 try: 275 try:
258 result = os.statvfs(self.fname) 276 result = os.statvfs(self.fname)
259 except OSError as e: 277 except OSError as e:
260 # On AtheOS, glibc always returns ENOSYS 278 # On AtheOS, glibc always returns ENOSYS
261 if e.errno == errno.ENOSYS: 279 if e.errno == errno.ENOSYS:
262 return 280 self.skipTest('os.statvfs() failed with ENOSYS')
263 281
264 # Make sure direct access works 282 # Make sure direct access works
265 self.assertEqual(result.f_bfree, result[3]) 283 self.assertEqual(result.f_bfree, result[3])
266 284
267 # Make sure all the attributes are there. 285 # Make sure all the attributes are there.
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 286 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
269 'ffree', 'favail', 'flag', 'namemax') 287 'ffree', 'favail', 'flag', 'namemax')
270 for value, member in enumerate(members): 288 for value, member in enumerate(members):
271 self.assertEqual(getattr(result, 'f_' + member), result[value]) 289 self.assertEqual(getattr(result, 'f_' + member), result[value])
272 290
(...skipping 15 matching lines...) Expand all
288 result2 = os.statvfs_result((10,)) 306 result2 = os.statvfs_result((10,))
289 self.fail("No exception raised") 307 self.fail("No exception raised")
290 except TypeError: 308 except TypeError:
291 pass 309 pass
292 310
293 # Use the constructor with a too-long tuple. 311 # Use the constructor with a too-long tuple.
294 try: 312 try:
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 313 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
296 except TypeError: 314 except TypeError:
297 pass 315 pass
316
317 @unittest.skipUnless(hasattr(os, 'statvfs'),
318 "need os.statvfs()")
319 def test_statvfs_result_pickle(self):
320 try:
321 result = os.statvfs(self.fname)
322 except OSError as e:
323 # On AtheOS, glibc always returns ENOSYS
324 if e.errno == errno.ENOSYS:
325 self.skipTest('os.statvfs() failed with ENOSYS')
326
327 p = pickle.dumps(result)
328 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
329 unpickled = pickle.loads(p)
330 self.assertEqual(result, unpickled)
298 331
299 def test_utime_dir(self): 332 def test_utime_dir(self):
300 delta = 1000000 333 delta = 1000000
301 st = os.stat(support.TESTFN) 334 st = os.stat(support.TESTFN)
302 # round to int, because some systems may support sub-second 335 # round to int, because some systems may support sub-second
303 # time stamps in stat, but not in utime. 336 # time stamps in stat, but not in utime.
304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) 337 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
305 st2 = os.stat(support.TESTFN) 338 st2 = os.stat(support.TESTFN)
306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) 339 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
307 340
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 def set_time(filename, atime, mtime): 468 def set_time(filename, atime, mtime):
436 dirname = os.path.dirname(filename) 469 dirname = os.path.dirname(filename)
437 dirfd = os.open(dirname, os.O_RDONLY) 470 dirfd = os.open(dirname, os.O_RDONLY)
438 try: 471 try:
439 os.utime(os.path.basename(filename), dir_fd=dirfd, 472 os.utime(os.path.basename(filename), dir_fd=dirfd,
440 times=(atime, mtime)) 473 times=(atime, mtime))
441 finally: 474 finally:
442 os.close(dirfd) 475 os.close(dirfd)
443 self._test_utime_subsecond(set_time) 476 self._test_utime_subsecond(set_time)
444 477
445 # Restrict test to Win32, since there is no guarantee other 478 # Restrict tests to Win32, since there is no guarantee other
446 # systems support centiseconds 479 # systems support centiseconds
447 if sys.platform == 'win32': 480 def get_file_system(path):
448 def get_file_system(path): 481 if sys.platform == 'win32':
449 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 482 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
450 import ctypes 483 import ctypes
451 kernel32 = ctypes.windll.kernel32 484 kernel32 = ctypes.windll.kernel32
452 buf = ctypes.create_unicode_buffer("", 100) 485 buf = ctypes.create_unicode_buffer("", 100)
453 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b uf, len(buf)): 486 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b uf, len(buf)):
454 return buf.value 487 return buf.value
455 488
456 if get_file_system(support.TESTFN) == "NTFS": 489 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
457 def test_1565150(self): 490 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
458 t1 = 1159195039.25 491 "requires NTFS")
459 os.utime(self.fname, (t1, t1)) 492 def test_1565150(self):
460 self.assertEqual(os.stat(self.fname).st_mtime, t1) 493 t1 = 1159195039.25
461 494 os.utime(self.fname, (t1, t1))
462 def test_large_time(self): 495 self.assertEqual(os.stat(self.fname).st_mtime, t1)
463 t1 = 5000000000 # some day in 2128 496
464 os.utime(self.fname, (t1, t1)) 497 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
465 self.assertEqual(os.stat(self.fname).st_mtime, t1) 498 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
466 499 "requires NTFS")
467 def test_1686475(self): 500 def test_large_time(self):
468 # Verify that an open file can be stat'ed 501 t1 = 5000000000 # some day in 2128
469 try: 502 os.utime(self.fname, (t1, t1))
470 os.stat(r"c:\pagefile.sys") 503 self.assertEqual(os.stat(self.fname).st_mtime, t1)
471 except FileNotFoundError: 504
472 pass # file does not exist; cannot run test 505 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
473 except OSError as e: 506 def test_1686475(self):
474 self.fail("Could not stat pagefile.sys") 507 # Verify that an open file can be stat'ed
475 508 try:
476 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 509 os.stat(r"c:\pagefile.sys")
477 def test_15261(self): 510 except FileNotFoundError:
478 # Verify that stat'ing a closed fd does not cause crash 511 self.skipTest(r'c:\pagefile.sys does not exist')
479 r, w = os.pipe() 512 except OSError as e:
480 try: 513 self.fail("Could not stat pagefile.sys")
481 os.stat(r) # should not raise error 514
482 finally: 515 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
483 os.close(r) 516 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
484 os.close(w) 517 def test_15261(self):
485 with self.assertRaises(OSError) as ctx: 518 # Verify that stat'ing a closed fd does not cause crash
486 os.stat(r) 519 r, w = os.pipe()
487 self.assertEqual(ctx.exception.errno, errno.EBADF) 520 try:
521 os.stat(r) # should not raise error
522 finally:
523 os.close(r)
524 os.close(w)
525 with self.assertRaises(OSError) as ctx:
526 os.stat(r)
527 self.assertEqual(ctx.exception.errno, errno.EBADF)
488 528
489 from test import mapping_tests 529 from test import mapping_tests
490 530
491 class EnvironTests(mapping_tests.BasicTestMappingProtocol): 531 class EnvironTests(mapping_tests.BasicTestMappingProtocol):
492 """check that os.environ object conform to mapping protocol""" 532 """check that os.environ object conform to mapping protocol"""
493 type2test = None 533 type2test = None
494 534
495 def setUp(self): 535 def setUp(self):
496 self.__save = dict(os.environ) 536 self.__save = dict(os.environ)
497 if os.supports_bytes_environ: 537 if os.supports_bytes_environ:
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
621 @support.requires_mac_ver(10, 6) 661 @support.requires_mac_ver(10, 6)
622 def test_unset_error(self): 662 def test_unset_error(self):
623 if sys.platform == "win32": 663 if sys.platform == "win32":
624 # an environment variable is limited to 32,767 characters 664 # an environment variable is limited to 32,767 characters
625 key = 'x' * 50000 665 key = 'x' * 50000
626 self.assertRaises(ValueError, os.environ.__delitem__, key) 666 self.assertRaises(ValueError, os.environ.__delitem__, key)
627 else: 667 else:
628 # "=" is not allowed in a variable name 668 # "=" is not allowed in a variable name
629 key = 'key=' 669 key = 'key='
630 self.assertRaises(OSError, os.environ.__delitem__, key) 670 self.assertRaises(OSError, os.environ.__delitem__, key)
671
672 def test_key_type(self):
673 missing = 'missingkey'
674 self.assertNotIn(missing, os.environ)
675
676 with self.assertRaises(KeyError) as cm:
677 os.environ[missing]
678 self.assertIs(cm.exception.args[0], missing)
679 self.assertTrue(cm.exception.__suppress_context__)
680
681 with self.assertRaises(KeyError) as cm:
682 del os.environ[missing]
683 self.assertIs(cm.exception.args[0], missing)
684 self.assertTrue(cm.exception.__suppress_context__)
685
631 686
632 class WalkTests(unittest.TestCase): 687 class WalkTests(unittest.TestCase):
633 """Tests for os.walk().""" 688 """Tests for os.walk()."""
634 689
635 def setUp(self): 690 def setUp(self):
636 import os 691 import os
637 from os.path import join 692 from os.path import join
638 693
639 # Build: 694 # Build:
640 # TESTFN/ 695 # TESTFN/
(...skipping 23 matching lines...) Expand all
664 719
665 # Create stuff. 720 # Create stuff.
666 os.makedirs(sub11_path) 721 os.makedirs(sub11_path)
667 os.makedirs(sub2_path) 722 os.makedirs(sub2_path)
668 os.makedirs(t2_path) 723 os.makedirs(t2_path)
669 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 724 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
670 f = open(path, "w") 725 f = open(path, "w")
671 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 726 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
672 f.close() 727 f.close()
673 if support.can_symlink(): 728 if support.can_symlink():
674 if os.name == 'nt': 729 os.symlink(os.path.abspath(t2_path), link_path)
675 def symlink_to_dir(src, dest): 730 os.symlink('broken', broken_link_path, True)
676 os.symlink(src, dest, True)
677 else:
678 symlink_to_dir = os.symlink
679 symlink_to_dir(os.path.abspath(t2_path), link_path)
680 symlink_to_dir('broken', broken_link_path)
681 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) 731 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
682 else: 732 else:
683 sub2_tree = (sub2_path, [], ["tmp3"]) 733 sub2_tree = (sub2_path, [], ["tmp3"])
684 734
685 # Walk top-down. 735 # Walk top-down.
686 all = list(os.walk(walk_path)) 736 all = list(os.walk(walk_path))
687 self.assertEqual(len(all), 4) 737 self.assertEqual(len(all), 4)
688 # We can't know which order SUB1 and SUB2 will appear in. 738 # We can't know which order SUB1 and SUB2 will appear in.
689 # Not flipped: TESTFN, SUB1, SUB11, SUB2 739 # Not flipped: TESTFN, SUB1, SUB11, SUB2
690 # flipped: TESTFN, SUB2, SUB1, SUB11 740 # flipped: TESTFN, SUB2, SUB1, SUB11
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
847 def test_exist_ok_existing_directory(self): 897 def test_exist_ok_existing_directory(self):
848 path = os.path.join(support.TESTFN, 'dir1') 898 path = os.path.join(support.TESTFN, 'dir1')
849 mode = 0o777 899 mode = 0o777
850 old_mask = os.umask(0o022) 900 old_mask = os.umask(0o022)
851 os.makedirs(path, mode) 901 os.makedirs(path, mode)
852 self.assertRaises(OSError, os.makedirs, path, mode) 902 self.assertRaises(OSError, os.makedirs, path, mode)
853 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 903 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
854 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True) 904 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
855 os.makedirs(path, mode=mode, exist_ok=True) 905 os.makedirs(path, mode=mode, exist_ok=True)
856 os.umask(old_mask) 906 os.umask(old_mask)
907
908 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
909 def test_chown_uid_gid_arguments_must_be_index(self):
910 stat = os.stat(support.TESTFN)
911 uid = stat.st_uid
912 gid = stat.st_gid
913 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)) :
914 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
915 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
916 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
917 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
857 918
858 def test_exist_ok_s_isgid_directory(self): 919 def test_exist_ok_s_isgid_directory(self):
859 path = os.path.join(support.TESTFN, 'dir1') 920 path = os.path.join(support.TESTFN, 'dir1')
860 S_ISGID = stat.S_ISGID 921 S_ISGID = stat.S_ISGID
861 mode = 0o777 922 mode = 0o777
862 old_mask = os.umask(0o022) 923 old_mask = os.umask(0o022)
863 try: 924 try:
864 existing_testfn_mode = stat.S_IMODE( 925 existing_testfn_mode = stat.S_IMODE(
865 os.lstat(support.TESTFN).st_mode) 926 os.lstat(support.TESTFN).st_mode)
866 try: 927 try:
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
980 out = assert_python_ok('-c', code) 1041 out = assert_python_ok('-c', code)
981 stdout = out[1] 1042 stdout = out[1]
982 self.assertEqual(len(stdout), 16) 1043 self.assertEqual(len(stdout), 16)
983 return stdout 1044 return stdout
984 1045
985 def test_urandom_subprocess(self): 1046 def test_urandom_subprocess(self):
986 data1 = self.get_urandom_subprocess(16) 1047 data1 = self.get_urandom_subprocess(16)
987 data2 = self.get_urandom_subprocess(16) 1048 data2 = self.get_urandom_subprocess(16)
988 self.assertNotEqual(data1, data2) 1049 self.assertNotEqual(data1, data2)
989 1050
1051 @unittest.skipUnless(resource, "test requires the resource module")
1052 def test_urandom_failure(self):
1053 # Check urandom() failing when it is not able to open /dev/random.
1054 # We spawn a new process to make the test more robust (if getrlimit()
1055 # failed to restore the file descriptor limit after this, the whole
1056 # test suite would crash; this actually happened on the OS X Tiger
1057 # buildbot).
1058 code = """if 1:
1059 import errno
1060 import os
1061 import resource
1062
1063 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1064 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1065 try:
1066 os.urandom(16)
1067 except OSError as e:
1068 assert e.errno == errno.EMFILE, e.errno
1069 else:
1070 raise AssertionError("OSError not raised")
1071 """
1072 assert_python_ok('-c', code)
1073
1074
990 @contextlib.contextmanager 1075 @contextlib.contextmanager
991 def _execvpe_mockup(defpath=None): 1076 def _execvpe_mockup(defpath=None):
992 """ 1077 """
993 Stubs out execv and execve functions when used as context manager. 1078 Stubs out execv and execve functions when used as context manager.
994 Records exec calls. The mock execv and execve functions always raise an 1079 Records exec calls. The mock execv and execve functions always raise an
995 exception as they would normally never return. 1080 exception as they would normally never return.
996 """ 1081 """
997 # A list of tuples containing (function name, first arg, args) 1082 # A list of tuples containing (function name, first arg, args)
998 # of calls to execv or execve that have been made. 1083 # of calls to execv or execve that have been made.
999 calls = [] 1084 calls = []
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
1078 self.assertEqual(len(calls), 1) 1163 self.assertEqual(len(calls), 1)
1079 self.assertSequenceEqual(calls[0], 1164 self.assertSequenceEqual(calls[0],
1080 ('execve', native_fullpath, (arguments, env_path))) 1165 ('execve', native_fullpath, (arguments, env_path)))
1081 1166
1082 def test_internal_execvpe_str(self): 1167 def test_internal_execvpe_str(self):
1083 self._test_internal_execvpe(str) 1168 self._test_internal_execvpe(str)
1084 if os.name != "nt": 1169 if os.name != "nt":
1085 self._test_internal_execvpe(bytes) 1170 self._test_internal_execvpe(bytes)
1086 1171
1087 1172
1173 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1088 class Win32ErrorTests(unittest.TestCase): 1174 class Win32ErrorTests(unittest.TestCase):
1089 def test_rename(self): 1175 def test_rename(self):
1090 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b ak") 1176 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b ak")
1091 1177
1092 def test_remove(self): 1178 def test_remove(self):
1093 self.assertRaises(OSError, os.remove, support.TESTFN) 1179 self.assertRaises(OSError, os.remove, support.TESTFN)
1094 1180
1095 def test_chdir(self): 1181 def test_chdir(self):
1096 self.assertRaises(OSError, os.chdir, support.TESTFN) 1182 self.assertRaises(OSError, os.chdir, support.TESTFN)
1097 1183
(...skipping 26 matching lines...) Expand all
1124 1210
1125 def check(self, f, *args): 1211 def check(self, f, *args):
1126 try: 1212 try:
1127 f(support.make_bad_fd(), *args) 1213 f(support.make_bad_fd(), *args)
1128 except OSError as e: 1214 except OSError as e:
1129 self.assertEqual(e.errno, errno.EBADF) 1215 self.assertEqual(e.errno, errno.EBADF)
1130 else: 1216 else:
1131 self.fail("%r didn't raise a OSError with a bad file descriptor" 1217 self.fail("%r didn't raise a OSError with a bad file descriptor"
1132 % f) 1218 % f)
1133 1219
1220 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1134 def test_isatty(self): 1221 def test_isatty(self):
1135 if hasattr(os, "isatty"): 1222 self.assertEqual(os.isatty(support.make_bad_fd()), False)
1136 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1223
1137 1224 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()' )
1138 def test_closerange(self): 1225 def test_closerange(self):
1139 if hasattr(os, "closerange"): 1226 fd = support.make_bad_fd()
1140 fd = support.make_bad_fd() 1227 # Make sure none of the descriptors we are about to close are
1141 # Make sure none of the descriptors we are about to close are 1228 # currently valid (issue 6542).
1142 # currently valid (issue 6542). 1229 for i in range(10):
1143 for i in range(10): 1230 try: os.fstat(fd+i)
1144 try: os.fstat(fd+i) 1231 except OSError:
1145 except OSError: 1232 pass
1146 pass 1233 else:
1147 else: 1234 break
1148 break 1235 if i < 2:
1149 if i < 2: 1236 raise unittest.SkipTest(
1150 raise unittest.SkipTest( 1237 "Unable to acquire a range of invalid file descriptors")
1151 "Unable to acquire a range of invalid file descriptors") 1238 self.assertEqual(os.closerange(fd, fd + i-1), None)
1152 self.assertEqual(os.closerange(fd, fd + i-1), None) 1239
1153 1240 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
1154 def test_dup2(self): 1241 def test_dup2(self):
1155 if hasattr(os, "dup2"): 1242 self.check(os.dup2, 20)
1156 self.check(os.dup2, 20) 1243
1157 1244 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
1158 def test_fchmod(self): 1245 def test_fchmod(self):
1159 if hasattr(os, "fchmod"): 1246 self.check(os.fchmod, 0)
1160 self.check(os.fchmod, 0) 1247
1161 1248 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
1162 def test_fchown(self): 1249 def test_fchown(self):
1163 if hasattr(os, "fchown"): 1250 self.check(os.fchown, -1, -1)
1164 self.check(os.fchown, -1, -1) 1251
1165 1252 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
1166 def test_fpathconf(self): 1253 def test_fpathconf(self):
1167 if hasattr(os, "fpathconf"): 1254 self.check(os.pathconf, "PC_NAME_MAX")
1168 self.check(os.pathconf, "PC_NAME_MAX") 1255 self.check(os.fpathconf, "PC_NAME_MAX")
1169 self.check(os.fpathconf, "PC_NAME_MAX") 1256
1170 1257 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
1171 def test_ftruncate(self): 1258 def test_ftruncate(self):
1172 if hasattr(os, "ftruncate"): 1259 self.check(os.truncate, 0)
1173 self.check(os.truncate, 0) 1260 self.check(os.ftruncate, 0)
1174 self.check(os.ftruncate, 0) 1261
1175 1262 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
1176 def test_lseek(self): 1263 def test_lseek(self):
1177 if hasattr(os, "lseek"): 1264 self.check(os.lseek, 0, 0)
1178 self.check(os.lseek, 0, 0) 1265
1179 1266 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
1180 def test_read(self): 1267 def test_read(self):
1181 if hasattr(os, "read"): 1268 self.check(os.read, 1)
1182 self.check(os.read, 1) 1269
1183 1270 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1271 def test_readv(self):
1272 buf = bytearray(10)
1273 self.check(os.readv, [buf])
1274
1275 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
1184 def test_tcsetpgrpt(self): 1276 def test_tcsetpgrpt(self):
1185 if hasattr(os, "tcsetpgrp"): 1277 self.check(os.tcsetpgrp, 0)
1186 self.check(os.tcsetpgrp, 0) 1278
1187 1279 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1188 def test_write(self): 1280 def test_write(self):
1189 if hasattr(os, "write"): 1281 self.check(os.write, b" ")
1190 self.check(os.write, b" ") 1282
1283 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1284 def test_writev(self):
1285 self.check(os.writev, [b'abc'])
1191 1286
1192 1287
1193 class LinkTests(unittest.TestCase): 1288 class LinkTests(unittest.TestCase):
1194 def setUp(self): 1289 def setUp(self):
1195 self.file1 = support.TESTFN 1290 self.file1 = support.TESTFN
1196 self.file2 = os.path.join(support.TESTFN + "2") 1291 self.file2 = os.path.join(support.TESTFN + "2")
1197 1292
1198 def tearDown(self): 1293 def tearDown(self):
1199 for file in (self.file1, self.file2): 1294 for file in (self.file1, self.file2):
1200 if os.path.exists(file): 1295 if os.path.exists(file):
(...skipping 19 matching lines...) Expand all
1220 def test_unicode_name(self): 1315 def test_unicode_name(self):
1221 try: 1316 try:
1222 os.fsencode("\xf1") 1317 os.fsencode("\xf1")
1223 except UnicodeError: 1318 except UnicodeError:
1224 raise unittest.SkipTest("Unable to encode for this platform.") 1319 raise unittest.SkipTest("Unable to encode for this platform.")
1225 1320
1226 self.file1 += "\xf1" 1321 self.file1 += "\xf1"
1227 self.file2 = self.file1 + "2" 1322 self.file2 = self.file1 + "2"
1228 self._test_link(self.file1, self.file2) 1323 self._test_link(self.file1, self.file2)
1229 1324
1230 if sys.platform != 'win32': 1325 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1231 class Win32ErrorTests(unittest.TestCase): 1326 class PosixUidGidTests(unittest.TestCase):
1232 pass 1327 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1233 1328 def test_setuid(self):
1234 class PosixUidGidTests(unittest.TestCase): 1329 if os.getuid() != 0:
1235 if hasattr(os, 'setuid'): 1330 self.assertRaises(OSError, os.setuid, 0)
1236 def test_setuid(self): 1331 self.assertRaises(OverflowError, os.setuid, 1<<32)
1237 if os.getuid() != 0: 1332
1238 self.assertRaises(OSError, os.setuid, 0) 1333 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1239 self.assertRaises(OverflowError, os.setuid, 1<<32) 1334 def test_setgid(self):
1240 1335 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1241 if hasattr(os, 'setgid'): 1336 self.assertRaises(OSError, os.setgid, 0)
1242 def test_setgid(self): 1337 self.assertRaises(OverflowError, os.setgid, 1<<32)
1243 if os.getuid() != 0: 1338
1244 self.assertRaises(OSError, os.setgid, 0) 1339 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1245 self.assertRaises(OverflowError, os.setgid, 1<<32) 1340 def test_seteuid(self):
1246 1341 if os.getuid() != 0:
1247 if hasattr(os, 'seteuid'): 1342 self.assertRaises(OSError, os.seteuid, 0)
1248 def test_seteuid(self): 1343 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1249 if os.getuid() != 0: 1344
1250 self.assertRaises(OSError, os.seteuid, 0) 1345 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1251 self.assertRaises(OverflowError, os.seteuid, 1<<32) 1346 def test_setegid(self):
1252 1347 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1253 if hasattr(os, 'setegid'): 1348 self.assertRaises(OSError, os.setegid, 0)
1254 def test_setegid(self): 1349 self.assertRaises(OverflowError, os.setegid, 1<<32)
1255 if os.getuid() != 0: 1350
1256 self.assertRaises(OSError, os.setegid, 0) 1351 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1257 self.assertRaises(OverflowError, os.setegid, 1<<32) 1352 def test_setreuid(self):
1258 1353 if os.getuid() != 0:
1259 if hasattr(os, 'setreuid'): 1354 self.assertRaises(OSError, os.setreuid, 0, 0)
1260 def test_setreuid(self): 1355 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1261 if os.getuid() != 0: 1356 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
1262 self.assertRaises(OSError, os.setreuid, 0, 0) 1357
1263 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 1358 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1264 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 1359 def test_setreuid_neg1(self):
1265 1360 # Needs to accept -1. We run this in a subprocess to avoid
1266 def test_setreuid_neg1(self): 1361 # altering the test runner's process state (issue8045).
1267 # Needs to accept -1. We run this in a subprocess to avoid 1362 subprocess.check_call([
1268 # altering the test runner's process state (issue8045). 1363 sys.executable, '-c',
1269 subprocess.check_call([ 1364 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
1270 sys.executable, '-c', 1365
1271 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 1366 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1272 1367 def test_setregid(self):
1273 if hasattr(os, 'setregid'): 1368 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1274 def test_setregid(self): 1369 self.assertRaises(OSError, os.setregid, 0, 0)
1275 if os.getuid() != 0: 1370 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1276 self.assertRaises(OSError, os.setregid, 0, 0) 1371 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
1277 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 1372
1278 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 1373 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1279 1374 def test_setregid_neg1(self):
1280 def test_setregid_neg1(self): 1375 # Needs to accept -1. We run this in a subprocess to avoid
1281 # Needs to accept -1. We run this in a subprocess to avoid 1376 # altering the test runner's process state (issue8045).
1282 # altering the test runner's process state (issue8045). 1377 subprocess.check_call([
1283 subprocess.check_call([ 1378 sys.executable, '-c',
1284 sys.executable, '-c', 1379 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
1285 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 1380
1286 1381 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1287 class Pep383Tests(unittest.TestCase): 1382 class Pep383Tests(unittest.TestCase):
1288 def setUp(self): 1383 def setUp(self):
1289 if support.TESTFN_UNENCODABLE: 1384 if support.TESTFN_UNENCODABLE:
1290 self.dir = support.TESTFN_UNENCODABLE 1385 self.dir = support.TESTFN_UNENCODABLE
1291 elif support.TESTFN_NONASCII: 1386 elif support.TESTFN_NONASCII:
1292 self.dir = support.TESTFN_NONASCII 1387 self.dir = support.TESTFN_NONASCII
1293 else: 1388 else:
1294 self.dir = support.TESTFN 1389 self.dir = support.TESTFN
1295 self.bdir = os.fsencode(self.dir) 1390 self.bdir = os.fsencode(self.dir)
1296 1391
1297 bytesfn = [] 1392 bytesfn = []
1298 def add_filename(fn): 1393 def add_filename(fn):
1299 try:
1300 fn = os.fsencode(fn)
1301 except UnicodeEncodeError:
1302 return
1303 bytesfn.append(fn)
1304 add_filename(support.TESTFN_UNICODE)
1305 if support.TESTFN_UNENCODABLE:
1306 add_filename(support.TESTFN_UNENCODABLE)
1307 if support.TESTFN_NONASCII:
1308 add_filename(support.TESTFN_NONASCII)
1309 if not bytesfn:
1310 self.skipTest("couldn't create any non-ascii filename")
1311
1312 self.unicodefn = set()
1313 os.mkdir(self.dir)
1314 try: 1394 try:
1315 for fn in bytesfn: 1395 fn = os.fsencode(fn)
1316 support.create_empty_file(os.path.join(self.bdir, fn)) 1396 except UnicodeEncodeError:
1317 fn = os.fsdecode(fn) 1397 return
1318 if fn in self.unicodefn: 1398 bytesfn.append(fn)
1319 raise ValueError("duplicate filename") 1399 add_filename(support.TESTFN_UNICODE)
1320 self.unicodefn.add(fn) 1400 if support.TESTFN_UNENCODABLE:
1321 except: 1401 add_filename(support.TESTFN_UNENCODABLE)
1322 shutil.rmtree(self.dir) 1402 if support.TESTFN_NONASCII:
1323 raise 1403 add_filename(support.TESTFN_NONASCII)
1324 1404 if not bytesfn:
1325 def tearDown(self): 1405 self.skipTest("couldn't create any non-ascii filename")
1406
1407 self.unicodefn = set()
1408 os.mkdir(self.dir)
1409 try:
1410 for fn in bytesfn:
1411 support.create_empty_file(os.path.join(self.bdir, fn))
1412 fn = os.fsdecode(fn)
1413 if fn in self.unicodefn:
1414 raise ValueError("duplicate filename")
1415 self.unicodefn.add(fn)
1416 except:
1326 shutil.rmtree(self.dir) 1417 shutil.rmtree(self.dir)
1327 1418 raise
1328 def test_listdir(self): 1419
1329 expected = self.unicodefn 1420 def tearDown(self):
1330 found = set(os.listdir(self.dir)) 1421 shutil.rmtree(self.dir)
1331 self.assertEqual(found, expected) 1422
1332 # test listdir without arguments 1423 def test_listdir(self):
1333 current_directory = os.getcwd() 1424 expected = self.unicodefn
1334 try: 1425 found = set(os.listdir(self.dir))
1335 os.chdir(os.sep) 1426 self.assertEqual(found, expected)
1336 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 1427 # test listdir without arguments
1337 finally: 1428 current_directory = os.getcwd()
1338 os.chdir(current_directory) 1429 try:
1339 1430 os.chdir(os.sep)
1340 def test_open(self): 1431 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1341 for fn in self.unicodefn: 1432 finally:
1342 f = open(os.path.join(self.dir, fn), 'rb') 1433 os.chdir(current_directory)
1343 f.close() 1434
1344 1435 def test_open(self):
1345 @unittest.skipUnless(hasattr(os, 'statvfs'), 1436 for fn in self.unicodefn:
1346 "need os.statvfs()") 1437 f = open(os.path.join(self.dir, fn), 'rb')
1347 def test_statvfs(self): 1438 f.close()
1348 # issue #9645 1439
1349 for fn in self.unicodefn: 1440 @unittest.skipUnless(hasattr(os, 'statvfs'),
1350 # should not fail with file not found error 1441 "need os.statvfs()")
1351 fullname = os.path.join(self.dir, fn) 1442 def test_statvfs(self):
1352 os.statvfs(fullname) 1443 # issue #9645
1353 1444 for fn in self.unicodefn:
1354 def test_stat(self): 1445 # should not fail with file not found error
1355 for fn in self.unicodefn: 1446 fullname = os.path.join(self.dir, fn)
1356 os.stat(os.path.join(self.dir, fn)) 1447 os.statvfs(fullname)
1357 else: 1448
1358 class PosixUidGidTests(unittest.TestCase): 1449 def test_stat(self):
1359 pass 1450 for fn in self.unicodefn:
1360 class Pep383Tests(unittest.TestCase): 1451 os.stat(os.path.join(self.dir, fn))
1361 pass
1362 1452
1363 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1453 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1364 class Win32KillTests(unittest.TestCase): 1454 class Win32KillTests(unittest.TestCase):
1365 def _kill(self, sig): 1455 def _kill(self, sig):
1366 # Start sys.executable as a subprocess and communicate from the 1456 # Start sys.executable as a subprocess and communicate from the
1367 # subprocess to the parent that the interpreter is ready. When it 1457 # subprocess to the parent that the interpreter is ready. When it
1368 # becomes ready, send *sig* via os.kill to the subprocess and check 1458 # becomes ready, send *sig* via os.kill to the subprocess and check
1369 # that the return code is equal to *sig*. 1459 # that the return code is equal to *sig*.
1370 import ctypes 1460 import ctypes
1371 from ctypes import wintypes 1461 from ctypes import wintypes
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
1470 # by subprocesses. 1560 # by subprocesses.
1471 SetConsoleCtrlHandler(NULL, 0) 1561 SetConsoleCtrlHandler(NULL, 0)
1472 1562
1473 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 1563 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1474 1564
1475 def test_CTRL_BREAK_EVENT(self): 1565 def test_CTRL_BREAK_EVENT(self):
1476 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 1566 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1477 1567
1478 1568
1479 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1569 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1570 class Win32ListdirTests(unittest.TestCase):
1571 """Test listdir on Windows."""
1572
1573 def setUp(self):
1574 self.created_paths = []
1575 for i in range(2):
1576 dir_name = 'SUB%d' % i
1577 dir_path = os.path.join(support.TESTFN, dir_name)
1578 file_name = 'FILE%d' % i
1579 file_path = os.path.join(support.TESTFN, file_name)
1580 os.makedirs(dir_path)
1581 with open(file_path, 'w') as f:
1582 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1583 self.created_paths.extend([dir_name, file_name])
1584 self.created_paths.sort()
1585
1586 def tearDown(self):
1587 shutil.rmtree(support.TESTFN)
1588
1589 def test_listdir_no_extended_path(self):
1590 """Test when the path is not an "extended" path."""
1591 # unicode
1592 self.assertEqual(
1593 sorted(os.listdir(support.TESTFN)),
1594 self.created_paths)
1595 # bytes
1596 self.assertEqual(
1597 sorted(os.listdir(os.fsencode(support.TESTFN))),
1598 [os.fsencode(path) for path in self.created_paths])
1599
1600 def test_listdir_extended_path(self):
1601 """Test when the path starts with '\\\\?\\'."""
1602 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247( v=vs.85).aspx#maxpath
1603 # unicode
1604 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1605 self.assertEqual(
1606 sorted(os.listdir(path)),
1607 self.created_paths)
1608 # bytes
1609 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1610 self.assertEqual(
1611 sorted(os.listdir(path)),
1612 [os.fsencode(path) for path in self.created_paths])
1613
1614
1615 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1480 @support.skip_unless_symlink 1616 @support.skip_unless_symlink
1481 class Win32SymlinkTests(unittest.TestCase): 1617 class Win32SymlinkTests(unittest.TestCase):
1482 filelink = 'filelinktest' 1618 filelink = 'filelinktest'
1483 filelink_target = os.path.abspath(__file__) 1619 filelink_target = os.path.abspath(__file__)
1484 dirlink = 'dirlinktest' 1620 dirlink = 'dirlinktest'
1485 dirlink_target = os.path.dirname(filelink_target) 1621 dirlink_target = os.path.dirname(filelink_target)
1486 missing_link = 'missing link' 1622 missing_link = 'missing link'
1487 1623
1488 def setUp(self): 1624 def setUp(self):
1489 assert os.path.exists(self.dirlink_target) 1625 assert os.path.exists(self.dirlink_target)
1490 assert os.path.exists(self.filelink_target) 1626 assert os.path.exists(self.filelink_target)
1491 assert not os.path.exists(self.dirlink) 1627 assert not os.path.exists(self.dirlink)
1492 assert not os.path.exists(self.filelink) 1628 assert not os.path.exists(self.filelink)
1493 assert not os.path.exists(self.missing_link) 1629 assert not os.path.exists(self.missing_link)
1494 1630
1495 def tearDown(self): 1631 def tearDown(self):
1496 if os.path.exists(self.filelink): 1632 if os.path.exists(self.filelink):
1497 os.remove(self.filelink) 1633 os.remove(self.filelink)
1498 if os.path.exists(self.dirlink): 1634 if os.path.exists(self.dirlink):
1499 os.rmdir(self.dirlink) 1635 os.rmdir(self.dirlink)
1500 if os.path.lexists(self.missing_link): 1636 if os.path.lexists(self.missing_link):
1501 os.remove(self.missing_link) 1637 os.remove(self.missing_link)
1502 1638
1503 def test_directory_link(self): 1639 def test_directory_link(self):
1504 os.symlink(self.dirlink_target, self.dirlink, True) 1640 os.symlink(self.dirlink_target, self.dirlink)
1505 self.assertTrue(os.path.exists(self.dirlink)) 1641 self.assertTrue(os.path.exists(self.dirlink))
1506 self.assertTrue(os.path.isdir(self.dirlink)) 1642 self.assertTrue(os.path.isdir(self.dirlink))
1507 self.assertTrue(os.path.islink(self.dirlink)) 1643 self.assertTrue(os.path.islink(self.dirlink))
1508 self.check_stat(self.dirlink, self.dirlink_target) 1644 self.check_stat(self.dirlink, self.dirlink_target)
1509 1645
1510 def test_file_link(self): 1646 def test_file_link(self):
1511 os.symlink(self.filelink_target, self.filelink) 1647 os.symlink(self.filelink_target, self.filelink)
1512 self.assertTrue(os.path.exists(self.filelink)) 1648 self.assertTrue(os.path.exists(self.filelink))
1513 self.assertTrue(os.path.isfile(self.filelink)) 1649 self.assertTrue(os.path.isfile(self.filelink))
1514 self.assertTrue(os.path.islink(self.filelink)) 1650 self.assertTrue(os.path.islink(self.filelink))
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
1586 os.chdir(level3) 1722 os.chdir(level3)
1587 self.assertEqual(os.stat(file1), 1723 self.assertEqual(os.stat(file1),
1588 os.stat(os.path.relpath(link))) 1724 os.stat(os.path.relpath(link)))
1589 finally: 1725 finally:
1590 os.chdir(orig_dir) 1726 os.chdir(orig_dir)
1591 except OSError as err: 1727 except OSError as err:
1592 self.fail(err) 1728 self.fail(err)
1593 finally: 1729 finally:
1594 os.remove(file1) 1730 os.remove(file1)
1595 shutil.rmtree(level1) 1731 shutil.rmtree(level1)
1732
1733
1734 @support.skip_unless_symlink
1735 class NonLocalSymlinkTests(unittest.TestCase):
1736
1737 def setUp(self):
1738 """
1739 Create this structure:
1740
1741 base
1742 \___ some_dir
1743 """
1744 os.makedirs('base/some_dir')
1745
1746 def tearDown(self):
1747 shutil.rmtree('base')
1748
1749 def test_directory_link_nonlocal(self):
1750 """
1751 The symlink target should resolve relative to the link, not relative
1752 to the current directory.
1753
1754 Then, link base/some_link -> base/some_dir and ensure that some_link
1755 is resolved as a directory.
1756
1757 In issue13772, it was discovered that directory detection failed if
1758 the symlink target was not specified relative to the current
1759 directory, which was a defect in the implementation.
1760 """
1761 src = os.path.join('base', 'some_link')
1762 os.symlink('some_dir', src)
1763 assert os.path.isdir(src)
1596 1764
1597 1765
1598 class FSEncodingTests(unittest.TestCase): 1766 class FSEncodingTests(unittest.TestCase):
1599 def test_nop(self): 1767 def test_nop(self):
1600 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 1768 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1601 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 1769 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
1602 1770
1603 def test_identity(self): 1771 def test_identity(self):
1604 # assert fsdecode(fsencode(x)) == x 1772 # assert fsdecode(fsencode(x)) == x
1605 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 1773 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
1757 1925
1758 1926
1759 @unittest.skipUnless(threading is not None, "test needs threading module") 1927 @unittest.skipUnless(threading is not None, "test needs threading module")
1760 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 1928 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1761 class TestSendfile(unittest.TestCase): 1929 class TestSendfile(unittest.TestCase):
1762 1930
1763 DATA = b"12345abcde" * 16 * 1024 # 160 KB 1931 DATA = b"12345abcde" * 16 * 1024 # 160 KB
1764 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 1932 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
1765 not sys.platform.startswith("solaris") and \ 1933 not sys.platform.startswith("solaris") and \
1766 not sys.platform.startswith("sunos") 1934 not sys.platform.startswith("sunos")
1935 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1936 'requires headers and trailers support')
1767 1937
1768 @classmethod 1938 @classmethod
1769 def setUpClass(cls): 1939 def setUpClass(cls):
1770 with open(support.TESTFN, "wb") as f: 1940 with open(support.TESTFN, "wb") as f:
1771 f.write(cls.DATA) 1941 f.write(cls.DATA)
1772 1942
1773 @classmethod 1943 @classmethod
1774 def tearDownClass(cls): 1944 def tearDownClass(cls):
1775 support.unlink(support.TESTFN) 1945 support.unlink(support.TESTFN)
1776 1946
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
1875 data = self.server.handler_instance.get_data() 2045 data = self.server.handler_instance.get_data()
1876 self.assertEqual(data, b'') 2046 self.assertEqual(data, b'')
1877 2047
1878 def test_invalid_offset(self): 2048 def test_invalid_offset(self):
1879 with self.assertRaises(OSError) as cm: 2049 with self.assertRaises(OSError) as cm:
1880 os.sendfile(self.sockno, self.fileno, -1, 4096) 2050 os.sendfile(self.sockno, self.fileno, -1, 4096)
1881 self.assertEqual(cm.exception.errno, errno.EINVAL) 2051 self.assertEqual(cm.exception.errno, errno.EINVAL)
1882 2052
1883 # --- headers / trailers tests 2053 # --- headers / trailers tests
1884 2054
1885 if SUPPORT_HEADERS_TRAILERS: 2055 @requires_headers_trailers
1886 2056 def test_headers(self):
1887 def test_headers(self): 2057 total_sent = 0
1888 total_sent = 0 2058 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1889 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 2059 headers=[b"x" * 512])
1890 headers=[b"x" * 512]) 2060 total_sent += sent
2061 offset = 4096
2062 nbytes = 4096
2063 while 1:
2064 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2065 offset, nbytes)
2066 if sent == 0:
2067 break
1891 total_sent += sent 2068 total_sent += sent
1892 offset = 4096 2069 offset += sent
1893 nbytes = 4096 2070
1894 while 1: 2071 expected_data = b"x" * 512 + self.DATA
1895 sent = self.sendfile_wrapper(self.sockno, self.fileno, 2072 self.assertEqual(total_sent, len(expected_data))
1896 offset, nbytes) 2073 self.client.close()
1897 if sent == 0: 2074 self.server.wait()
1898 break 2075 data = self.server.handler_instance.get_data()
1899 total_sent += sent 2076 self.assertEqual(hash(data), hash(expected_data))
1900 offset += sent 2077
1901 2078 @requires_headers_trailers
1902 expected_data = b"x" * 512 + self.DATA 2079 def test_trailers(self):
1903 self.assertEqual(total_sent, len(expected_data)) 2080 TESTFN2 = support.TESTFN + "2"
2081 file_data = b"abcdef"
2082 with open(TESTFN2, 'wb') as f:
2083 f.write(file_data)
2084 with open(TESTFN2, 'rb')as f:
2085 self.addCleanup(os.remove, TESTFN2)
2086 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2087 trailers=[b"1234"])
1904 self.client.close() 2088 self.client.close()
1905 self.server.wait() 2089 self.server.wait()
1906 data = self.server.handler_instance.get_data() 2090 data = self.server.handler_instance.get_data()
1907 self.assertEqual(hash(data), hash(expected_data)) 2091 self.assertEqual(data, b"abcdef1234")
1908 2092
1909 def test_trailers(self): 2093 @requires_headers_trailers
1910 TESTFN2 = support.TESTFN + "2" 2094 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
1911 with open(TESTFN2, 'wb') as f: 2095 'test needs os.SF_NODISKIO')
1912 f.write(b"abcde") 2096 def test_flags(self):
1913 with open(TESTFN2, 'rb')as f: 2097 try:
1914 self.addCleanup(os.remove, TESTFN2) 2098 os.sendfile(self.sockno, self.fileno, 0, 4096,
1915 os.sendfile(self.sockno, f.fileno(), 0, 4096, 2099 flags=os.SF_NODISKIO)
1916 trailers=[b"12345"]) 2100 except OSError as err:
1917 self.client.close() 2101 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1918 self.server.wait() 2102 raise
1919 data = self.server.handler_instance.get_data()
1920 self.assertEqual(data, b"abcde12345")
1921
1922 if hasattr(os, "SF_NODISKIO"):
1923 def test_flags(self):
1924 try:
1925 os.sendfile(self.sockno, self.fileno, 0, 4096,
1926 flags=os.SF_NODISKIO)
1927 except OSError as err:
1928 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1929 raise
1930 2103
1931 2104
1932 def supports_extended_attributes(): 2105 def supports_extended_attributes():
1933 if not hasattr(os, "setxattr"): 2106 if not hasattr(os, "setxattr"):
1934 return False 2107 return False
1935 try: 2108 try:
1936 with open(support.TESTFN, "wb") as fp: 2109 with open(support.TESTFN, "wb") as fp:
1937 try: 2110 try:
1938 os.setxattr(fp.fileno(), b"user.test", b"") 2111 os.setxattr(fp.fileno(), b"user.test", b"")
1939 except OSError: 2112 except OSError:
(...skipping 254 matching lines...) Expand 10 before | Expand all | Expand 10 after
2194 2367
2195 for filenames, func, *func_args in funcs: 2368 for filenames, func, *func_args in funcs:
2196 for name in filenames: 2369 for name in filenames:
2197 try: 2370 try:
2198 func(name, *func_args) 2371 func(name, *func_args)
2199 except OSError as err: 2372 except OSError as err:
2200 self.assertIs(err.filename, name) 2373 self.assertIs(err.filename, name)
2201 else: 2374 else:
2202 self.fail("No exception thrown by {}".format(func)) 2375 self.fail("No exception thrown by {}".format(func))
2203 2376
2377 class CPUCountTests(unittest.TestCase):
2378 def test_cpu_count(self):
2379 cpus = os.cpu_count()
2380 if cpus is not None:
2381 self.assertIsInstance(cpus, int)
2382 self.assertGreater(cpus, 0)
2383 else:
2384 self.skipTest("Could not determine the number of CPUs")
2385
2386
2387 class FDInheritanceTests(unittest.TestCase):
2388 def test_get_set_inheritable(self):
2389 fd = os.open(__file__, os.O_RDONLY)
2390 self.addCleanup(os.close, fd)
2391 self.assertEqual(os.get_inheritable(fd), False)
2392
2393 os.set_inheritable(fd, True)
2394 self.assertEqual(os.get_inheritable(fd), True)
2395
2396 @unittest.skipIf(fcntl is None, "need fcntl")
2397 def test_get_inheritable_cloexec(self):
2398 fd = os.open(__file__, os.O_RDONLY)
2399 self.addCleanup(os.close, fd)
2400 self.assertEqual(os.get_inheritable(fd), False)
2401
2402 # clear FD_CLOEXEC flag
2403 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2404 flags &= ~fcntl.FD_CLOEXEC
2405 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
2406
2407 self.assertEqual(os.get_inheritable(fd), True)
2408
2409 @unittest.skipIf(fcntl is None, "need fcntl")
2410 def test_set_inheritable_cloexec(self):
2411 fd = os.open(__file__, os.O_RDONLY)
2412 self.addCleanup(os.close, fd)
2413 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2414 fcntl.FD_CLOEXEC)
2415
2416 os.set_inheritable(fd, True)
2417 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2418 0)
2419
2420 def test_open(self):
2421 fd = os.open(__file__, os.O_RDONLY)
2422 self.addCleanup(os.close, fd)
2423 self.assertEqual(os.get_inheritable(fd), False)
2424
2425 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2426 def test_pipe(self):
2427 rfd, wfd = os.pipe()
2428 self.addCleanup(os.close, rfd)
2429 self.addCleanup(os.close, wfd)
2430 self.assertEqual(os.get_inheritable(rfd), False)
2431 self.assertEqual(os.get_inheritable(wfd), False)
2432
2433 def test_dup(self):
2434 fd1 = os.open(__file__, os.O_RDONLY)
2435 self.addCleanup(os.close, fd1)
2436
2437 fd2 = os.dup(fd1)
2438 self.addCleanup(os.close, fd2)
2439 self.assertEqual(os.get_inheritable(fd2), False)
2440
2441 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2442 def test_dup2(self):
2443 fd = os.open(__file__, os.O_RDONLY)
2444 self.addCleanup(os.close, fd)
2445
2446 # inheritable by default
2447 fd2 = os.open(__file__, os.O_RDONLY)
2448 try:
2449 os.dup2(fd, fd2)
2450 self.assertEqual(os.get_inheritable(fd2), True)
2451 finally:
2452 os.close(fd2)
2453
2454 # force non-inheritable
2455 fd3 = os.open(__file__, os.O_RDONLY)
2456 try:
2457 os.dup2(fd, fd3, inheritable=False)
2458 self.assertEqual(os.get_inheritable(fd3), False)
2459 finally:
2460 os.close(fd3)
2461
2462 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2463 def test_openpty(self):
2464 master_fd, slave_fd = os.openpty()
2465 self.addCleanup(os.close, master_fd)
2466 self.addCleanup(os.close, slave_fd)
2467 self.assertEqual(os.get_inheritable(master_fd), False)
2468 self.assertEqual(os.get_inheritable(slave_fd), False)
2469
2470
2204 @support.reap_threads 2471 @support.reap_threads
2205 def test_main(): 2472 def test_main():
2206 support.run_unittest( 2473 support.run_unittest(
2207 FileTests, 2474 FileTests,
2208 StatAttributeTests, 2475 StatAttributeTests,
2209 EnvironTests, 2476 EnvironTests,
2210 WalkTests, 2477 WalkTests,
2211 FwalkTests, 2478 FwalkTests,
2212 MakedirTests, 2479 MakedirTests,
2213 DevNullTests, 2480 DevNullTests,
2214 URandomTests, 2481 URandomTests,
2215 ExecTests, 2482 ExecTests,
2216 Win32ErrorTests, 2483 Win32ErrorTests,
2217 TestInvalidFD, 2484 TestInvalidFD,
2218 PosixUidGidTests, 2485 PosixUidGidTests,
2219 Pep383Tests, 2486 Pep383Tests,
2220 Win32KillTests, 2487 Win32KillTests,
2488 Win32ListdirTests,
2221 Win32SymlinkTests, 2489 Win32SymlinkTests,
2490 NonLocalSymlinkTests,
2222 FSEncodingTests, 2491 FSEncodingTests,
2223 DeviceEncodingTests, 2492 DeviceEncodingTests,
2224 PidTests, 2493 PidTests,
2225 LoginTests, 2494 LoginTests,
2226 LinkTests, 2495 LinkTests,
2227 TestSendfile, 2496 TestSendfile,
2228 ProgramPriorityTests, 2497 ProgramPriorityTests,
2229 ExtendedAttributeTests, 2498 ExtendedAttributeTests,
2230 Win32DeprecatedBytesAPI, 2499 Win32DeprecatedBytesAPI,
2231 TermsizeTests, 2500 TermsizeTests,
2232 OSErrorTests, 2501 OSErrorTests,
2233 RemoveDirsTests, 2502 RemoveDirsTests,
2503 CPUCountTests,
2504 FDInheritanceTests,
2234 ) 2505 )
2235 2506
2236 if __name__ == "__main__": 2507 if __name__ == "__main__":
2237 test_main() 2508 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+