Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(169352)

Delta Between Two Patch Sets: Lib/test/test_os.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 7 years ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/_test_multiprocessing.py ('k') | Lib/test/test_ossaudiodev.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # As a test suite for the os module, this is woefully inadequate, but this 1 # As a test suite for the os module, this is woefully inadequate, but this
2 # does add tests for a few functions which have been determined to be more 2 # does add tests for a few functions which have been determined to be more
3 # portable than they had been thought to be. 3 # portable than they had been thought to be.
4 4
5 import os 5 import os
6 import errno 6 import errno
7 import unittest 7 import unittest
8 import warnings 8 import warnings
9 import sys 9 import sys
10 import signal 10 import signal
11 import subprocess 11 import subprocess
12 import time 12 import time
13 import shutil 13 import shutil
14 from test import support 14 from test import support
15 import contextlib 15 import contextlib
16 import mmap 16 import mmap
17 import platform 17 import platform
18 import re 18 import re
19 import uuid 19 import uuid
20 import asyncore 20 import asyncore
21 import asynchat 21 import asynchat
22 import socket 22 import socket
23 import itertools 23 import itertools
24 import stat 24 import stat
25 import locale 25 import locale
26 import codecs 26 import codecs
27 import decimal
28 import fractions
29 import pickle
27 try: 30 try:
28 import threading 31 import threading
29 except ImportError: 32 except ImportError:
30 threading = None 33 threading = None
34 try:
35 import resource
36 except ImportError:
37 resource = None
38 try:
39 import fcntl
40 except ImportError:
41 fcntl = None
42
31 from test.script_helper import assert_python_ok 43 from test.script_helper import assert_python_ok
32 44
33 with warnings.catch_warnings(): 45 with warnings.catch_warnings():
34 warnings.simplefilter("ignore", DeprecationWarning) 46 warnings.simplefilter("ignore", DeprecationWarning)
35 os.stat_float_times(True) 47 os.stat_float_times(True)
36 st = os.stat(__file__) 48 st = os.stat(__file__)
37 stat_supports_subsecond = ( 49 stat_supports_subsecond = (
38 # check if float and int timestamps are different 50 # check if float and int timestamps are different
39 (st.st_atime != st[7]) 51 (st.st_atime != st[7])
40 or (st.st_mtime != st[8]) 52 or (st.st_mtime != st[8])
41 or (st.st_ctime != st[9])) 53 or (st.st_ctime != st[9]))
42 54
43 # Detect whether we're on a Linux system that uses the (now outdated 55 # Detect whether we're on a Linux system that uses the (now outdated
44 # and unmaintained) linuxthreads threading library. There's an issue 56 # and unmaintained) linuxthreads threading library. There's an issue
45 # when combining linuxthreads with a failed execv call: see 57 # when combining linuxthreads with a failed execv call: see
46 # http://bugs.python.org/issue4970. 58 # http://bugs.python.org/issue4970.
47 if hasattr(sys, 'thread_info') and sys.thread_info.version: 59 if hasattr(sys, 'thread_info') and sys.thread_info.version:
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
49 else: 61 else:
50 USING_LINUXTHREADS = False 62 USING_LINUXTHREADS = False
63
64 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
65 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
51 66
52 # Tests creating TESTFN 67 # Tests creating TESTFN
53 class FileTests(unittest.TestCase): 68 class FileTests(unittest.TestCase):
54 def setUp(self): 69 def setUp(self):
55 if os.path.exists(support.TESTFN): 70 if os.path.exists(support.TESTFN):
56 os.unlink(support.TESTFN) 71 os.unlink(support.TESTFN)
57 tearDown = setUp 72 tearDown = setUp
58 73
59 def test_access(self): 74 def test_access(self):
60 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 75 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 os.mkdir(support.TESTFN) 178 os.mkdir(support.TESTFN)
164 self.fname = os.path.join(support.TESTFN, "f1") 179 self.fname = os.path.join(support.TESTFN, "f1")
165 f = open(self.fname, 'wb') 180 f = open(self.fname, 'wb')
166 f.write(b"ABC") 181 f.write(b"ABC")
167 f.close() 182 f.close()
168 183
169 def tearDown(self): 184 def tearDown(self):
170 os.unlink(self.fname) 185 os.unlink(self.fname)
171 os.rmdir(support.TESTFN) 186 os.rmdir(support.TESTFN)
172 187
188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
173 def check_stat_attributes(self, fname): 189 def check_stat_attributes(self, fname):
174 if not hasattr(os, "stat"):
175 return
176
177 result = os.stat(fname) 190 result = os.stat(fname)
178 191
179 # Make sure direct access works 192 # Make sure direct access works
180 self.assertEqual(result[stat.ST_SIZE], 3) 193 self.assertEqual(result[stat.ST_SIZE], 3)
181 self.assertEqual(result.st_size, 3) 194 self.assertEqual(result.st_size, 3)
182 195
183 # Make sure all the attributes are there 196 # Make sure all the attributes are there
184 members = dir(result) 197 members = dir(result)
185 for name in dir(stat): 198 for name in dir(stat):
186 if name[:3] == 'ST_': 199 if name[:3] == 'ST_':
187 attr = name.lower() 200 attr = name.lower()
188 if name.endswith("TIME"): 201 if name.endswith("TIME"):
189 def trunc(x): return int(x) 202 def trunc(x): return int(x)
190 else: 203 else:
191 def trunc(x): return x 204 def trunc(x): return x
192 self.assertEqual(trunc(getattr(result, attr)), 205 self.assertEqual(trunc(getattr(result, attr)),
193 result[getattr(stat, name)]) 206 result[getattr(stat, name)])
194 self.assertIn(attr, members) 207 self.assertIn(attr, members)
195 208
196 # Make sure that the st_?time and st_?time_ns fields roughly agree 209 # Make sure that the st_?time and st_?time_ns fields roughly agree
197 # (they should always agree up to around tens-of-microseconds) 210 # (they should always agree up to around tens-of-microseconds)
198 for name in 'st_atime st_mtime st_ctime'.split(): 211 for name in 'st_atime st_mtime st_ctime'.split():
199 floaty = int(getattr(result, name) * 100000) 212 floaty = int(getattr(result, name) * 100000)
200 nanosecondy = getattr(result, name + "_ns") // 10000 213 nanosecondy = getattr(result, name + "_ns") // 10000
201 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 214 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
202 215
203 try: 216 try:
204 result[200] 217 result[200]
205 self.fail("No exception thrown") 218 self.fail("No exception raised")
206 except IndexError: 219 except IndexError:
207 pass 220 pass
208 221
209 # Make sure that assignment fails 222 # Make sure that assignment fails
210 try: 223 try:
211 result.st_mode = 1 224 result.st_mode = 1
212 self.fail("No exception thrown") 225 self.fail("No exception raised")
213 except AttributeError: 226 except AttributeError:
214 pass 227 pass
215 228
216 try: 229 try:
217 result.st_rdev = 1 230 result.st_rdev = 1
218 self.fail("No exception thrown") 231 self.fail("No exception raised")
219 except (AttributeError, TypeError): 232 except (AttributeError, TypeError):
220 pass 233 pass
221 234
222 try: 235 try:
223 result.parrot = 1 236 result.parrot = 1
224 self.fail("No exception thrown") 237 self.fail("No exception raised")
225 except AttributeError: 238 except AttributeError:
226 pass 239 pass
227 240
228 # Use the stat_result constructor with a too-short tuple. 241 # Use the stat_result constructor with a too-short tuple.
229 try: 242 try:
230 result2 = os.stat_result((10,)) 243 result2 = os.stat_result((10,))
231 self.fail("No exception thrown") 244 self.fail("No exception raised")
232 except TypeError: 245 except TypeError:
233 pass 246 pass
234 247
235 # Use the constructor with a too-long tuple. 248 # Use the constructor with a too-long tuple.
236 try: 249 try:
237 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 250 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
238 except TypeError: 251 except TypeError:
239 pass 252 pass
240 253
241 def test_stat_attributes(self): 254 def test_stat_attributes(self):
242 self.check_stat_attributes(self.fname) 255 self.check_stat_attributes(self.fname)
243 256
244 def test_stat_attributes_bytes(self): 257 def test_stat_attributes_bytes(self):
245 try: 258 try:
246 fname = self.fname.encode(sys.getfilesystemencoding()) 259 fname = self.fname.encode(sys.getfilesystemencoding())
247 except UnicodeEncodeError: 260 except UnicodeEncodeError:
248 self.skipTest("cannot encode %a for the filesystem" % self.fname) 261 self.skipTest("cannot encode %a for the filesystem" % self.fname)
249 with warnings.catch_warnings(): 262 with warnings.catch_warnings():
250 warnings.simplefilter("ignore", DeprecationWarning) 263 warnings.simplefilter("ignore", DeprecationWarning)
251 self.check_stat_attributes(fname) 264 self.check_stat_attributes(fname)
252 265
266 def test_stat_result_pickle(self):
267 result = os.stat(self.fname)
268 p = pickle.dumps(result)
269 self.assertIn(b'\x03cos\nstat_result\n', p)
270 unpickled = pickle.loads(p)
271 self.assertEqual(result, unpickled)
272
273 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
253 def test_statvfs_attributes(self): 274 def test_statvfs_attributes(self):
254 if not hasattr(os, "statvfs"):
255 return
256
257 try: 275 try:
258 result = os.statvfs(self.fname) 276 result = os.statvfs(self.fname)
259 except OSError as e: 277 except OSError as e:
260 # On AtheOS, glibc always returns ENOSYS 278 # On AtheOS, glibc always returns ENOSYS
261 if e.errno == errno.ENOSYS: 279 if e.errno == errno.ENOSYS:
262 return 280 self.skipTest('os.statvfs() failed with ENOSYS')
263 281
264 # Make sure direct access works 282 # Make sure direct access works
265 self.assertEqual(result.f_bfree, result[3]) 283 self.assertEqual(result.f_bfree, result[3])
266 284
267 # Make sure all the attributes are there. 285 # Make sure all the attributes are there.
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 286 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
269 'ffree', 'favail', 'flag', 'namemax') 287 'ffree', 'favail', 'flag', 'namemax')
270 for value, member in enumerate(members): 288 for value, member in enumerate(members):
271 self.assertEqual(getattr(result, 'f_' + member), result[value]) 289 self.assertEqual(getattr(result, 'f_' + member), result[value])
272 290
273 # Make sure that assignment really fails 291 # Make sure that assignment really fails
274 try: 292 try:
275 result.f_bfree = 1 293 result.f_bfree = 1
276 self.fail("No exception thrown") 294 self.fail("No exception raised")
277 except AttributeError: 295 except AttributeError:
278 pass 296 pass
279 297
280 try: 298 try:
281 result.parrot = 1 299 result.parrot = 1
282 self.fail("No exception thrown") 300 self.fail("No exception raised")
283 except AttributeError: 301 except AttributeError:
284 pass 302 pass
285 303
286 # Use the constructor with a too-short tuple. 304 # Use the constructor with a too-short tuple.
287 try: 305 try:
288 result2 = os.statvfs_result((10,)) 306 result2 = os.statvfs_result((10,))
289 self.fail("No exception thrown") 307 self.fail("No exception raised")
290 except TypeError: 308 except TypeError:
291 pass 309 pass
292 310
293 # Use the constructor with a too-long tuple. 311 # Use the constructor with a too-long tuple.
294 try: 312 try:
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 313 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
296 except TypeError: 314 except TypeError:
297 pass 315 pass
316
317 @unittest.skipUnless(hasattr(os, 'statvfs'),
318 "need os.statvfs()")
319 def test_statvfs_result_pickle(self):
320 try:
321 result = os.statvfs(self.fname)
322 except OSError as e:
323 # On AtheOS, glibc always returns ENOSYS
324 if e.errno == errno.ENOSYS:
325 self.skipTest('os.statvfs() failed with ENOSYS')
326
327 p = pickle.dumps(result)
328 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
329 unpickled = pickle.loads(p)
330 self.assertEqual(result, unpickled)
298 331
299 def test_utime_dir(self): 332 def test_utime_dir(self):
300 delta = 1000000 333 delta = 1000000
301 st = os.stat(support.TESTFN) 334 st = os.stat(support.TESTFN)
302 # round to int, because some systems may support sub-second 335 # round to int, because some systems may support sub-second
303 # time stamps in stat, but not in utime. 336 # time stamps in stat, but not in utime.
304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) 337 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
305 st2 = os.stat(support.TESTFN) 338 st2 = os.stat(support.TESTFN)
306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) 339 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
307 340
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 def set_time(filename, atime, mtime): 468 def set_time(filename, atime, mtime):
436 dirname = os.path.dirname(filename) 469 dirname = os.path.dirname(filename)
437 dirfd = os.open(dirname, os.O_RDONLY) 470 dirfd = os.open(dirname, os.O_RDONLY)
438 try: 471 try:
439 os.utime(os.path.basename(filename), dir_fd=dirfd, 472 os.utime(os.path.basename(filename), dir_fd=dirfd,
440 times=(atime, mtime)) 473 times=(atime, mtime))
441 finally: 474 finally:
442 os.close(dirfd) 475 os.close(dirfd)
443 self._test_utime_subsecond(set_time) 476 self._test_utime_subsecond(set_time)
444 477
445 # Restrict test to Win32, since there is no guarantee other 478 # Restrict tests to Win32, since there is no guarantee other
446 # systems support centiseconds 479 # systems support centiseconds
447 if sys.platform == 'win32': 480 def get_file_system(path):
448 def get_file_system(path): 481 if sys.platform == 'win32':
449 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 482 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
450 import ctypes 483 import ctypes
451 kernel32 = ctypes.windll.kernel32 484 kernel32 = ctypes.windll.kernel32
452 buf = ctypes.create_unicode_buffer("", 100) 485 buf = ctypes.create_unicode_buffer("", 100)
453 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b uf, len(buf)): 486 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, b uf, len(buf)):
454 return buf.value 487 return buf.value
455 488
456 if get_file_system(support.TESTFN) == "NTFS": 489 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
457 def test_1565150(self): 490 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
458 t1 = 1159195039.25 491 "requires NTFS")
459 os.utime(self.fname, (t1, t1)) 492 def test_1565150(self):
460 self.assertEqual(os.stat(self.fname).st_mtime, t1) 493 t1 = 1159195039.25
461 494 os.utime(self.fname, (t1, t1))
462 def test_large_time(self): 495 self.assertEqual(os.stat(self.fname).st_mtime, t1)
463 t1 = 5000000000 # some day in 2128 496
464 os.utime(self.fname, (t1, t1)) 497 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
465 self.assertEqual(os.stat(self.fname).st_mtime, t1) 498 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
466 499 "requires NTFS")
467 def test_1686475(self): 500 def test_large_time(self):
468 # Verify that an open file can be stat'ed 501 t1 = 5000000000 # some day in 2128
469 try: 502 os.utime(self.fname, (t1, t1))
470 os.stat(r"c:\pagefile.sys") 503 self.assertEqual(os.stat(self.fname).st_mtime, t1)
471 except WindowsError as e: 504
472 if e.errno == 2: # file does not exist; cannot run test 505 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
473 return 506 def test_1686475(self):
474 self.fail("Could not stat pagefile.sys") 507 # Verify that an open file can be stat'ed
475 508 try:
476 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 509 os.stat(r"c:\pagefile.sys")
477 def test_15261(self): 510 except FileNotFoundError:
478 # Verify that stat'ing a closed fd does not cause crash 511 self.skipTest(r'c:\pagefile.sys does not exist')
479 r, w = os.pipe() 512 except OSError as e:
480 try: 513 self.fail("Could not stat pagefile.sys")
481 os.stat(r) # should not raise error 514
482 finally: 515 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
483 os.close(r) 516 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
484 os.close(w) 517 def test_15261(self):
485 with self.assertRaises(OSError) as ctx: 518 # Verify that stat'ing a closed fd does not cause crash
486 os.stat(r) 519 r, w = os.pipe()
487 self.assertEqual(ctx.exception.errno, errno.EBADF) 520 try:
521 os.stat(r) # should not raise error
522 finally:
523 os.close(r)
524 os.close(w)
525 with self.assertRaises(OSError) as ctx:
526 os.stat(r)
527 self.assertEqual(ctx.exception.errno, errno.EBADF)
488 528
489 from test import mapping_tests 529 from test import mapping_tests
490 530
491 class EnvironTests(mapping_tests.BasicTestMappingProtocol): 531 class EnvironTests(mapping_tests.BasicTestMappingProtocol):
492 """check that os.environ object conform to mapping protocol""" 532 """check that os.environ object conform to mapping protocol"""
493 type2test = None 533 type2test = None
494 534
495 def setUp(self): 535 def setUp(self):
496 self.__save = dict(os.environ) 536 self.__save = dict(os.environ)
497 if os.supports_bytes_environ: 537 if os.supports_bytes_environ:
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
621 @support.requires_mac_ver(10, 6) 661 @support.requires_mac_ver(10, 6)
622 def test_unset_error(self): 662 def test_unset_error(self):
623 if sys.platform == "win32": 663 if sys.platform == "win32":
624 # an environment variable is limited to 32,767 characters 664 # an environment variable is limited to 32,767 characters
625 key = 'x' * 50000 665 key = 'x' * 50000
626 self.assertRaises(ValueError, os.environ.__delitem__, key) 666 self.assertRaises(ValueError, os.environ.__delitem__, key)
627 else: 667 else:
628 # "=" is not allowed in a variable name 668 # "=" is not allowed in a variable name
629 key = 'key=' 669 key = 'key='
630 self.assertRaises(OSError, os.environ.__delitem__, key) 670 self.assertRaises(OSError, os.environ.__delitem__, key)
671
672 def test_key_type(self):
673 missing = 'missingkey'
674 self.assertNotIn(missing, os.environ)
675
676 with self.assertRaises(KeyError) as cm:
677 os.environ[missing]
678 self.assertIs(cm.exception.args[0], missing)
679 self.assertTrue(cm.exception.__suppress_context__)
680
681 with self.assertRaises(KeyError) as cm:
682 del os.environ[missing]
683 self.assertIs(cm.exception.args[0], missing)
684 self.assertTrue(cm.exception.__suppress_context__)
685
631 686
632 class WalkTests(unittest.TestCase): 687 class WalkTests(unittest.TestCase):
633 """Tests for os.walk().""" 688 """Tests for os.walk()."""
634 689
635 def setUp(self): 690 def setUp(self):
636 import os 691 import os
637 from os.path import join 692 from os.path import join
638 693
639 # Build: 694 # Build:
640 # TESTFN/ 695 # TESTFN/
(...skipping 23 matching lines...) Expand all
664 719
665 # Create stuff. 720 # Create stuff.
666 os.makedirs(sub11_path) 721 os.makedirs(sub11_path)
667 os.makedirs(sub2_path) 722 os.makedirs(sub2_path)
668 os.makedirs(t2_path) 723 os.makedirs(t2_path)
669 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 724 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
670 f = open(path, "w") 725 f = open(path, "w")
671 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 726 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
672 f.close() 727 f.close()
673 if support.can_symlink(): 728 if support.can_symlink():
674 if os.name == 'nt': 729 os.symlink(os.path.abspath(t2_path), link_path)
675 def symlink_to_dir(src, dest): 730 os.symlink('broken', broken_link_path, True)
676 os.symlink(src, dest, True)
677 else:
678 symlink_to_dir = os.symlink
679 symlink_to_dir(os.path.abspath(t2_path), link_path)
680 symlink_to_dir('broken', broken_link_path)
681 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) 731 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
682 else: 732 else:
683 sub2_tree = (sub2_path, [], ["tmp3"]) 733 sub2_tree = (sub2_path, [], ["tmp3"])
684 734
685 # Walk top-down. 735 # Walk top-down.
686 all = list(os.walk(walk_path)) 736 all = list(os.walk(walk_path))
687 self.assertEqual(len(all), 4) 737 self.assertEqual(len(all), 4)
688 # We can't know which order SUB1 and SUB2 will appear in. 738 # We can't know which order SUB1 and SUB2 will appear in.
689 # Not flipped: TESTFN, SUB1, SUB11, SUB2 739 # Not flipped: TESTFN, SUB1, SUB11, SUB2
690 # flipped: TESTFN, SUB2, SUB1, SUB11 740 # flipped: TESTFN, SUB2, SUB1, SUB11
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
847 def test_exist_ok_existing_directory(self): 897 def test_exist_ok_existing_directory(self):
848 path = os.path.join(support.TESTFN, 'dir1') 898 path = os.path.join(support.TESTFN, 'dir1')
849 mode = 0o777 899 mode = 0o777
850 old_mask = os.umask(0o022) 900 old_mask = os.umask(0o022)
851 os.makedirs(path, mode) 901 os.makedirs(path, mode)
852 self.assertRaises(OSError, os.makedirs, path, mode) 902 self.assertRaises(OSError, os.makedirs, path, mode)
853 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 903 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
854 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True) 904 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
855 os.makedirs(path, mode=mode, exist_ok=True) 905 os.makedirs(path, mode=mode, exist_ok=True)
856 os.umask(old_mask) 906 os.umask(old_mask)
907
908 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
909 def test_chown_uid_gid_arguments_must_be_index(self):
910 stat = os.stat(support.TESTFN)
911 uid = stat.st_uid
912 gid = stat.st_gid
913 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)) :
914 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
915 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
916 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
917 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
857 918
858 def test_exist_ok_s_isgid_directory(self): 919 def test_exist_ok_s_isgid_directory(self):
859 path = os.path.join(support.TESTFN, 'dir1') 920 path = os.path.join(support.TESTFN, 'dir1')
860 S_ISGID = stat.S_ISGID 921 S_ISGID = stat.S_ISGID
861 mode = 0o777 922 mode = 0o777
862 old_mask = os.umask(0o022) 923 old_mask = os.umask(0o022)
863 try: 924 try:
864 existing_testfn_mode = stat.S_IMODE( 925 existing_testfn_mode = stat.S_IMODE(
865 os.lstat(support.TESTFN).st_mode) 926 os.lstat(support.TESTFN).st_mode)
866 try: 927 try:
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
898 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', 959 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
899 'dir4', 'dir5', 'dir6') 960 'dir4', 'dir5', 'dir6')
900 # If the tests failed, the bottom-most directory ('../dir6') 961 # If the tests failed, the bottom-most directory ('../dir6')
901 # may not have been created, so we look for the outermost directory 962 # may not have been created, so we look for the outermost directory
902 # that exists. 963 # that exists.
903 while not os.path.exists(path) and path != support.TESTFN: 964 while not os.path.exists(path) and path != support.TESTFN:
904 path = os.path.dirname(path) 965 path = os.path.dirname(path)
905 966
906 os.removedirs(path) 967 os.removedirs(path)
907 968
969
970 class RemoveDirsTests(unittest.TestCase):
971 def setUp(self):
972 os.makedirs(support.TESTFN)
973
974 def tearDown(self):
975 support.rmtree(support.TESTFN)
976
977 def test_remove_all(self):
978 dira = os.path.join(support.TESTFN, 'dira')
979 os.mkdir(dira)
980 dirb = os.path.join(dira, 'dirb')
981 os.mkdir(dirb)
982 os.removedirs(dirb)
983 self.assertFalse(os.path.exists(dirb))
984 self.assertFalse(os.path.exists(dira))
985 self.assertFalse(os.path.exists(support.TESTFN))
986
987 def test_remove_partial(self):
988 dira = os.path.join(support.TESTFN, 'dira')
989 os.mkdir(dira)
990 dirb = os.path.join(dira, 'dirb')
991 os.mkdir(dirb)
992 with open(os.path.join(dira, 'file.txt'), 'w') as f:
993 f.write('text')
994 os.removedirs(dirb)
995 self.assertFalse(os.path.exists(dirb))
996 self.assertTrue(os.path.exists(dira))
997 self.assertTrue(os.path.exists(support.TESTFN))
998
999 def test_remove_nothing(self):
1000 dira = os.path.join(support.TESTFN, 'dira')
1001 os.mkdir(dira)
1002 dirb = os.path.join(dira, 'dirb')
1003 os.mkdir(dirb)
1004 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1005 f.write('text')
1006 with self.assertRaises(OSError):
1007 os.removedirs(dirb)
1008 self.assertTrue(os.path.exists(dirb))
1009 self.assertTrue(os.path.exists(dira))
1010 self.assertTrue(os.path.exists(support.TESTFN))
1011
1012
908 class DevNullTests(unittest.TestCase): 1013 class DevNullTests(unittest.TestCase):
909 def test_devnull(self): 1014 def test_devnull(self):
910 with open(os.devnull, 'wb') as f: 1015 with open(os.devnull, 'wb') as f:
911 f.write(b'hello') 1016 f.write(b'hello')
912 f.close() 1017 f.close()
913 with open(os.devnull, 'rb') as f: 1018 with open(os.devnull, 'rb') as f:
914 self.assertEqual(f.read(), b'') 1019 self.assertEqual(f.read(), b'')
1020
915 1021
916 class URandomTests(unittest.TestCase): 1022 class URandomTests(unittest.TestCase):
917 def test_urandom_length(self): 1023 def test_urandom_length(self):
918 self.assertEqual(len(os.urandom(0)), 0) 1024 self.assertEqual(len(os.urandom(0)), 0)
919 self.assertEqual(len(os.urandom(1)), 1) 1025 self.assertEqual(len(os.urandom(1)), 1)
920 self.assertEqual(len(os.urandom(10)), 10) 1026 self.assertEqual(len(os.urandom(10)), 10)
921 self.assertEqual(len(os.urandom(100)), 100) 1027 self.assertEqual(len(os.urandom(100)), 100)
922 self.assertEqual(len(os.urandom(1000)), 1000) 1028 self.assertEqual(len(os.urandom(1000)), 1000)
923 1029
924 def test_urandom_value(self): 1030 def test_urandom_value(self):
925 data1 = os.urandom(16) 1031 data1 = os.urandom(16)
926 data2 = os.urandom(16) 1032 data2 = os.urandom(16)
927 self.assertNotEqual(data1, data2) 1033 self.assertNotEqual(data1, data2)
928 1034
929 def get_urandom_subprocess(self, count): 1035 def get_urandom_subprocess(self, count):
930 code = '\n'.join(( 1036 code = '\n'.join((
931 'import os, sys', 1037 'import os, sys',
932 'data = os.urandom(%s)' % count, 1038 'data = os.urandom(%s)' % count,
933 'sys.stdout.buffer.write(data)', 1039 'sys.stdout.buffer.write(data)',
934 'sys.stdout.buffer.flush()')) 1040 'sys.stdout.buffer.flush()'))
935 out = assert_python_ok('-c', code) 1041 out = assert_python_ok('-c', code)
936 stdout = out[1] 1042 stdout = out[1]
937 self.assertEqual(len(stdout), 16) 1043 self.assertEqual(len(stdout), 16)
938 return stdout 1044 return stdout
939 1045
940 def test_urandom_subprocess(self): 1046 def test_urandom_subprocess(self):
941 data1 = self.get_urandom_subprocess(16) 1047 data1 = self.get_urandom_subprocess(16)
942 data2 = self.get_urandom_subprocess(16) 1048 data2 = self.get_urandom_subprocess(16)
943 self.assertNotEqual(data1, data2) 1049 self.assertNotEqual(data1, data2)
1050
1051 @unittest.skipUnless(resource, "test requires the resource module")
1052 def test_urandom_failure(self):
1053 # Check urandom() failing when it is not able to open /dev/random.
1054 # We spawn a new process to make the test more robust (if getrlimit()
1055 # failed to restore the file descriptor limit after this, the whole
1056 # test suite would crash; this actually happened on the OS X Tiger
1057 # buildbot).
1058 code = """if 1:
1059 import errno
1060 import os
1061 import resource
1062
1063 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1064 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1065 try:
1066 os.urandom(16)
1067 except OSError as e:
1068 assert e.errno == errno.EMFILE, e.errno
1069 else:
1070 raise AssertionError("OSError not raised")
1071 """
1072 assert_python_ok('-c', code)
1073
944 1074
945 @contextlib.contextmanager 1075 @contextlib.contextmanager
946 def _execvpe_mockup(defpath=None): 1076 def _execvpe_mockup(defpath=None):
947 """ 1077 """
948 Stubs out execv and execve functions when used as context manager. 1078 Stubs out execv and execve functions when used as context manager.
949 Records exec calls. The mock execv and execve functions always raise an 1079 Records exec calls. The mock execv and execve functions always raise an
950 exception as they would normally never return. 1080 exception as they would normally never return.
951 """ 1081 """
952 # A list of tuples containing (function name, first arg, args) 1082 # A list of tuples containing (function name, first arg, args)
953 # of calls to execv or execve that have been made. 1083 # of calls to execv or execve that have been made.
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
1033 self.assertEqual(len(calls), 1) 1163 self.assertEqual(len(calls), 1)
1034 self.assertSequenceEqual(calls[0], 1164 self.assertSequenceEqual(calls[0],
1035 ('execve', native_fullpath, (arguments, env_path))) 1165 ('execve', native_fullpath, (arguments, env_path)))
1036 1166
1037 def test_internal_execvpe_str(self): 1167 def test_internal_execvpe_str(self):
1038 self._test_internal_execvpe(str) 1168 self._test_internal_execvpe(str)
1039 if os.name != "nt": 1169 if os.name != "nt":
1040 self._test_internal_execvpe(bytes) 1170 self._test_internal_execvpe(bytes)
1041 1171
1042 1172
1173 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1043 class Win32ErrorTests(unittest.TestCase): 1174 class Win32ErrorTests(unittest.TestCase):
1044 def test_rename(self): 1175 def test_rename(self):
1045 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTF N+".bak") 1176 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b ak")
1046 1177
1047 def test_remove(self): 1178 def test_remove(self):
1048 self.assertRaises(WindowsError, os.remove, support.TESTFN) 1179 self.assertRaises(OSError, os.remove, support.TESTFN)
1049 1180
1050 def test_chdir(self): 1181 def test_chdir(self):
1051 self.assertRaises(WindowsError, os.chdir, support.TESTFN) 1182 self.assertRaises(OSError, os.chdir, support.TESTFN)
1052 1183
1053 def test_mkdir(self): 1184 def test_mkdir(self):
1054 f = open(support.TESTFN, "w") 1185 f = open(support.TESTFN, "w")
1055 try: 1186 try:
1056 self.assertRaises(WindowsError, os.mkdir, support.TESTFN) 1187 self.assertRaises(OSError, os.mkdir, support.TESTFN)
1057 finally: 1188 finally:
1058 f.close() 1189 f.close()
1059 os.unlink(support.TESTFN) 1190 os.unlink(support.TESTFN)
1060 1191
1061 def test_utime(self): 1192 def test_utime(self):
1062 self.assertRaises(WindowsError, os.utime, support.TESTFN, None) 1193 self.assertRaises(OSError, os.utime, support.TESTFN, None)
1063 1194
1064 def test_chmod(self): 1195 def test_chmod(self):
1065 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0) 1196 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1066 1197
1067 class TestInvalidFD(unittest.TestCase): 1198 class TestInvalidFD(unittest.TestCase):
1068 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1199 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1069 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1200 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1070 #singles.append("close") 1201 #singles.append("close")
1071 #We omit close because it doesn'r raise an exception on some platforms 1202 #We omit close because it doesn'r raise an exception on some platforms
1072 def get_single(f): 1203 def get_single(f):
1073 def helper(self): 1204 def helper(self):
1074 if hasattr(os, f): 1205 if hasattr(os, f):
1075 self.check(getattr(os, f)) 1206 self.check(getattr(os, f))
1076 return helper 1207 return helper
1077 for f in singles: 1208 for f in singles:
1078 locals()["test_"+f] = get_single(f) 1209 locals()["test_"+f] = get_single(f)
1079 1210
1080 def check(self, f, *args): 1211 def check(self, f, *args):
1081 try: 1212 try:
1082 f(support.make_bad_fd(), *args) 1213 f(support.make_bad_fd(), *args)
1083 except OSError as e: 1214 except OSError as e:
1084 self.assertEqual(e.errno, errno.EBADF) 1215 self.assertEqual(e.errno, errno.EBADF)
1085 else: 1216 else:
1086 self.fail("%r didn't raise a OSError with a bad file descriptor" 1217 self.fail("%r didn't raise a OSError with a bad file descriptor"
1087 % f) 1218 % f)
1088 1219
1220 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1089 def test_isatty(self): 1221 def test_isatty(self):
1090 if hasattr(os, "isatty"): 1222 self.assertEqual(os.isatty(support.make_bad_fd()), False)
1091 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1223
1092 1224 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()' )
1093 def test_closerange(self): 1225 def test_closerange(self):
1094 if hasattr(os, "closerange"): 1226 fd = support.make_bad_fd()
1095 fd = support.make_bad_fd() 1227 # Make sure none of the descriptors we are about to close are
1096 # Make sure none of the descriptors we are about to close are 1228 # currently valid (issue 6542).
1097 # currently valid (issue 6542). 1229 for i in range(10):
1098 for i in range(10): 1230 try: os.fstat(fd+i)
1099 try: os.fstat(fd+i) 1231 except OSError:
1100 except OSError: 1232 pass
1101 pass 1233 else:
1102 else: 1234 break
1103 break 1235 if i < 2:
1104 if i < 2: 1236 raise unittest.SkipTest(
1105 raise unittest.SkipTest( 1237 "Unable to acquire a range of invalid file descriptors")
1106 "Unable to acquire a range of invalid file descriptors") 1238 self.assertEqual(os.closerange(fd, fd + i-1), None)
1107 self.assertEqual(os.closerange(fd, fd + i-1), None) 1239
1108 1240 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
1109 def test_dup2(self): 1241 def test_dup2(self):
1110 if hasattr(os, "dup2"): 1242 self.check(os.dup2, 20)
1111 self.check(os.dup2, 20) 1243
1112 1244 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
1113 def test_fchmod(self): 1245 def test_fchmod(self):
1114 if hasattr(os, "fchmod"): 1246 self.check(os.fchmod, 0)
1115 self.check(os.fchmod, 0) 1247
1116 1248 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
1117 def test_fchown(self): 1249 def test_fchown(self):
1118 if hasattr(os, "fchown"): 1250 self.check(os.fchown, -1, -1)
1119 self.check(os.fchown, -1, -1) 1251
1120 1252 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
1121 def test_fpathconf(self): 1253 def test_fpathconf(self):
1122 if hasattr(os, "fpathconf"): 1254 self.check(os.pathconf, "PC_NAME_MAX")
1123 self.check(os.pathconf, "PC_NAME_MAX") 1255 self.check(os.fpathconf, "PC_NAME_MAX")
1124 self.check(os.fpathconf, "PC_NAME_MAX") 1256
1125 1257 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
1126 def test_ftruncate(self): 1258 def test_ftruncate(self):
1127 if hasattr(os, "ftruncate"): 1259 self.check(os.truncate, 0)
1128 self.check(os.truncate, 0) 1260 self.check(os.ftruncate, 0)
1129 self.check(os.ftruncate, 0) 1261
1130 1262 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
1131 def test_lseek(self): 1263 def test_lseek(self):
1132 if hasattr(os, "lseek"): 1264 self.check(os.lseek, 0, 0)
1133 self.check(os.lseek, 0, 0) 1265
1134 1266 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
1135 def test_read(self): 1267 def test_read(self):
1136 if hasattr(os, "read"): 1268 self.check(os.read, 1)
1137 self.check(os.read, 1) 1269
1138 1270 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1271 def test_readv(self):
1272 buf = bytearray(10)
1273 self.check(os.readv, [buf])
1274
1275 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
1139 def test_tcsetpgrpt(self): 1276 def test_tcsetpgrpt(self):
1140 if hasattr(os, "tcsetpgrp"): 1277 self.check(os.tcsetpgrp, 0)
1141 self.check(os.tcsetpgrp, 0) 1278
1142 1279 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1143 def test_write(self): 1280 def test_write(self):
1144 if hasattr(os, "write"): 1281 self.check(os.write, b" ")
1145 self.check(os.write, b" ") 1282
1283 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1284 def test_writev(self):
1285 self.check(os.writev, [b'abc'])
1146 1286
1147 1287
1148 class LinkTests(unittest.TestCase): 1288 class LinkTests(unittest.TestCase):
1149 def setUp(self): 1289 def setUp(self):
1150 self.file1 = support.TESTFN 1290 self.file1 = support.TESTFN
1151 self.file2 = os.path.join(support.TESTFN + "2") 1291 self.file2 = os.path.join(support.TESTFN + "2")
1152 1292
1153 def tearDown(self): 1293 def tearDown(self):
1154 for file in (self.file1, self.file2): 1294 for file in (self.file1, self.file2):
1155 if os.path.exists(file): 1295 if os.path.exists(file):
(...skipping 19 matching lines...) Expand all
1175 def test_unicode_name(self): 1315 def test_unicode_name(self):
1176 try: 1316 try:
1177 os.fsencode("\xf1") 1317 os.fsencode("\xf1")
1178 except UnicodeError: 1318 except UnicodeError:
1179 raise unittest.SkipTest("Unable to encode for this platform.") 1319 raise unittest.SkipTest("Unable to encode for this platform.")
1180 1320
1181 self.file1 += "\xf1" 1321 self.file1 += "\xf1"
1182 self.file2 = self.file1 + "2" 1322 self.file2 = self.file1 + "2"
1183 self._test_link(self.file1, self.file2) 1323 self._test_link(self.file1, self.file2)
1184 1324
1185 if sys.platform != 'win32': 1325 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1186 class Win32ErrorTests(unittest.TestCase): 1326 class PosixUidGidTests(unittest.TestCase):
1187 pass 1327 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1188 1328 def test_setuid(self):
1189 class PosixUidGidTests(unittest.TestCase): 1329 if os.getuid() != 0:
1190 if hasattr(os, 'setuid'): 1330 self.assertRaises(OSError, os.setuid, 0)
1191 def test_setuid(self): 1331 self.assertRaises(OverflowError, os.setuid, 1<<32)
1192 if os.getuid() != 0: 1332
1193 self.assertRaises(os.error, os.setuid, 0) 1333 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1194 self.assertRaises(OverflowError, os.setuid, 1<<32) 1334 def test_setgid(self):
1195 1335 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1196 if hasattr(os, 'setgid'): 1336 self.assertRaises(OSError, os.setgid, 0)
1197 def test_setgid(self): 1337 self.assertRaises(OverflowError, os.setgid, 1<<32)
1198 if os.getuid() != 0: 1338
1199 self.assertRaises(os.error, os.setgid, 0) 1339 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1200 self.assertRaises(OverflowError, os.setgid, 1<<32) 1340 def test_seteuid(self):
1201 1341 if os.getuid() != 0:
1202 if hasattr(os, 'seteuid'): 1342 self.assertRaises(OSError, os.seteuid, 0)
1203 def test_seteuid(self): 1343 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1204 if os.getuid() != 0: 1344
1205 self.assertRaises(os.error, os.seteuid, 0) 1345 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1206 self.assertRaises(OverflowError, os.seteuid, 1<<32) 1346 def test_setegid(self):
1207 1347 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1208 if hasattr(os, 'setegid'): 1348 self.assertRaises(OSError, os.setegid, 0)
1209 def test_setegid(self): 1349 self.assertRaises(OverflowError, os.setegid, 1<<32)
1210 if os.getuid() != 0: 1350
1211 self.assertRaises(os.error, os.setegid, 0) 1351 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1212 self.assertRaises(OverflowError, os.setegid, 1<<32) 1352 def test_setreuid(self):
1213 1353 if os.getuid() != 0:
1214 if hasattr(os, 'setreuid'): 1354 self.assertRaises(OSError, os.setreuid, 0, 0)
1215 def test_setreuid(self): 1355 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1216 if os.getuid() != 0: 1356 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
1217 self.assertRaises(os.error, os.setreuid, 0, 0) 1357
1218 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 1358 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1219 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 1359 def test_setreuid_neg1(self):
1220 1360 # Needs to accept -1. We run this in a subprocess to avoid
1221 def test_setreuid_neg1(self): 1361 # altering the test runner's process state (issue8045).
1222 # Needs to accept -1. We run this in a subprocess to avoid 1362 subprocess.check_call([
1223 # altering the test runner's process state (issue8045). 1363 sys.executable, '-c',
1224 subprocess.check_call([ 1364 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
1225 sys.executable, '-c', 1365
1226 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 1366 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1227 1367 def test_setregid(self):
1228 if hasattr(os, 'setregid'): 1368 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1229 def test_setregid(self): 1369 self.assertRaises(OSError, os.setregid, 0, 0)
1230 if os.getuid() != 0: 1370 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1231 self.assertRaises(os.error, os.setregid, 0, 0) 1371 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
1232 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 1372
1233 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 1373 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1234 1374 def test_setregid_neg1(self):
1235 def test_setregid_neg1(self): 1375 # Needs to accept -1. We run this in a subprocess to avoid
1236 # Needs to accept -1. We run this in a subprocess to avoid 1376 # altering the test runner's process state (issue8045).
1237 # altering the test runner's process state (issue8045). 1377 subprocess.check_call([
1238 subprocess.check_call([ 1378 sys.executable, '-c',
1239 sys.executable, '-c', 1379 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
1240 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 1380
1241 1381 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1242 class Pep383Tests(unittest.TestCase): 1382 class Pep383Tests(unittest.TestCase):
1243 def setUp(self): 1383 def setUp(self):
1244 if support.TESTFN_UNENCODABLE: 1384 if support.TESTFN_UNENCODABLE:
1245 self.dir = support.TESTFN_UNENCODABLE 1385 self.dir = support.TESTFN_UNENCODABLE
1246 elif support.TESTFN_NONASCII: 1386 elif support.TESTFN_NONASCII:
1247 self.dir = support.TESTFN_NONASCII 1387 self.dir = support.TESTFN_NONASCII
1248 else: 1388 else:
1249 self.dir = support.TESTFN 1389 self.dir = support.TESTFN
1250 self.bdir = os.fsencode(self.dir) 1390 self.bdir = os.fsencode(self.dir)
1251 1391
1252 bytesfn = [] 1392 bytesfn = []
1253 def add_filename(fn): 1393 def add_filename(fn):
1254 try:
1255 fn = os.fsencode(fn)
1256 except UnicodeEncodeError:
1257 return
1258 bytesfn.append(fn)
1259 add_filename(support.TESTFN_UNICODE)
1260 if support.TESTFN_UNENCODABLE:
1261 add_filename(support.TESTFN_UNENCODABLE)
1262 if support.TESTFN_NONASCII:
1263 add_filename(support.TESTFN_NONASCII)
1264 if not bytesfn:
1265 self.skipTest("couldn't create any non-ascii filename")
1266
1267 self.unicodefn = set()
1268 os.mkdir(self.dir)
1269 try: 1394 try:
1270 for fn in bytesfn: 1395 fn = os.fsencode(fn)
1271 support.create_empty_file(os.path.join(self.bdir, fn)) 1396 except UnicodeEncodeError:
1272 fn = os.fsdecode(fn) 1397 return
1273 if fn in self.unicodefn: 1398 bytesfn.append(fn)
1274 raise ValueError("duplicate filename") 1399 add_filename(support.TESTFN_UNICODE)
1275 self.unicodefn.add(fn) 1400 if support.TESTFN_UNENCODABLE:
1276 except: 1401 add_filename(support.TESTFN_UNENCODABLE)
1277 shutil.rmtree(self.dir) 1402 if support.TESTFN_NONASCII:
1278 raise 1403 add_filename(support.TESTFN_NONASCII)
1279 1404 if not bytesfn:
1280 def tearDown(self): 1405 self.skipTest("couldn't create any non-ascii filename")
1406
1407 self.unicodefn = set()
1408 os.mkdir(self.dir)
1409 try:
1410 for fn in bytesfn:
1411 support.create_empty_file(os.path.join(self.bdir, fn))
1412 fn = os.fsdecode(fn)
1413 if fn in self.unicodefn:
1414 raise ValueError("duplicate filename")
1415 self.unicodefn.add(fn)
1416 except:
1281 shutil.rmtree(self.dir) 1417 shutil.rmtree(self.dir)
1282 1418 raise
1283 def test_listdir(self): 1419
1284 expected = self.unicodefn 1420 def tearDown(self):
1285 found = set(os.listdir(self.dir)) 1421 shutil.rmtree(self.dir)
1286 self.assertEqual(found, expected) 1422
1287 # test listdir without arguments 1423 def test_listdir(self):
1288 current_directory = os.getcwd() 1424 expected = self.unicodefn
1289 try: 1425 found = set(os.listdir(self.dir))
1290 os.chdir(os.sep) 1426 self.assertEqual(found, expected)
1291 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 1427 # test listdir without arguments
1292 finally: 1428 current_directory = os.getcwd()
1293 os.chdir(current_directory) 1429 try:
1294 1430 os.chdir(os.sep)
1295 def test_open(self): 1431 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1296 for fn in self.unicodefn: 1432 finally:
1297 f = open(os.path.join(self.dir, fn), 'rb') 1433 os.chdir(current_directory)
1298 f.close() 1434
1299 1435 def test_open(self):
1300 def test_stat(self): 1436 for fn in self.unicodefn:
1301 for fn in self.unicodefn: 1437 f = open(os.path.join(self.dir, fn), 'rb')
1302 os.stat(os.path.join(self.dir, fn)) 1438 f.close()
1303 else: 1439
1304 class PosixUidGidTests(unittest.TestCase): 1440 @unittest.skipUnless(hasattr(os, 'statvfs'),
1305 pass 1441 "need os.statvfs()")
1306 class Pep383Tests(unittest.TestCase): 1442 def test_statvfs(self):
1307 pass 1443 # issue #9645
1444 for fn in self.unicodefn:
1445 # should not fail with file not found error
1446 fullname = os.path.join(self.dir, fn)
1447 os.statvfs(fullname)
1448
1449 def test_stat(self):
1450 for fn in self.unicodefn:
1451 os.stat(os.path.join(self.dir, fn))
1308 1452
1309 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1453 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1310 class Win32KillTests(unittest.TestCase): 1454 class Win32KillTests(unittest.TestCase):
1311 def _kill(self, sig): 1455 def _kill(self, sig):
1312 # Start sys.executable as a subprocess and communicate from the 1456 # Start sys.executable as a subprocess and communicate from the
1313 # subprocess to the parent that the interpreter is ready. When it 1457 # subprocess to the parent that the interpreter is ready. When it
1314 # becomes ready, send *sig* via os.kill to the subprocess and check 1458 # becomes ready, send *sig* via os.kill to the subprocess and check
1315 # that the return code is equal to *sig*. 1459 # that the return code is equal to *sig*.
1316 import ctypes 1460 import ctypes
1317 from ctypes import wintypes 1461 from ctypes import wintypes
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
1416 # by subprocesses. 1560 # by subprocesses.
1417 SetConsoleCtrlHandler(NULL, 0) 1561 SetConsoleCtrlHandler(NULL, 0)
1418 1562
1419 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 1563 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1420 1564
1421 def test_CTRL_BREAK_EVENT(self): 1565 def test_CTRL_BREAK_EVENT(self):
1422 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 1566 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1423 1567
1424 1568
1425 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1569 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1570 class Win32ListdirTests(unittest.TestCase):
1571 """Test listdir on Windows."""
1572
1573 def setUp(self):
1574 self.created_paths = []
1575 for i in range(2):
1576 dir_name = 'SUB%d' % i
1577 dir_path = os.path.join(support.TESTFN, dir_name)
1578 file_name = 'FILE%d' % i
1579 file_path = os.path.join(support.TESTFN, file_name)
1580 os.makedirs(dir_path)
1581 with open(file_path, 'w') as f:
1582 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1583 self.created_paths.extend([dir_name, file_name])
1584 self.created_paths.sort()
1585
1586 def tearDown(self):
1587 shutil.rmtree(support.TESTFN)
1588
1589 def test_listdir_no_extended_path(self):
1590 """Test when the path is not an "extended" path."""
1591 # unicode
1592 self.assertEqual(
1593 sorted(os.listdir(support.TESTFN)),
1594 self.created_paths)
1595 # bytes
1596 self.assertEqual(
1597 sorted(os.listdir(os.fsencode(support.TESTFN))),
1598 [os.fsencode(path) for path in self.created_paths])
1599
1600 def test_listdir_extended_path(self):
1601 """Test when the path starts with '\\\\?\\'."""
1602 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247( v=vs.85).aspx#maxpath
1603 # unicode
1604 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1605 self.assertEqual(
1606 sorted(os.listdir(path)),
1607 self.created_paths)
1608 # bytes
1609 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1610 self.assertEqual(
1611 sorted(os.listdir(path)),
1612 [os.fsencode(path) for path in self.created_paths])
1613
1614
1615 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1426 @support.skip_unless_symlink 1616 @support.skip_unless_symlink
1427 class Win32SymlinkTests(unittest.TestCase): 1617 class Win32SymlinkTests(unittest.TestCase):
1428 filelink = 'filelinktest' 1618 filelink = 'filelinktest'
1429 filelink_target = os.path.abspath(__file__) 1619 filelink_target = os.path.abspath(__file__)
1430 dirlink = 'dirlinktest' 1620 dirlink = 'dirlinktest'
1431 dirlink_target = os.path.dirname(filelink_target) 1621 dirlink_target = os.path.dirname(filelink_target)
1432 missing_link = 'missing link' 1622 missing_link = 'missing link'
1433 1623
1434 def setUp(self): 1624 def setUp(self):
1435 assert os.path.exists(self.dirlink_target) 1625 assert os.path.exists(self.dirlink_target)
1436 assert os.path.exists(self.filelink_target) 1626 assert os.path.exists(self.filelink_target)
1437 assert not os.path.exists(self.dirlink) 1627 assert not os.path.exists(self.dirlink)
1438 assert not os.path.exists(self.filelink) 1628 assert not os.path.exists(self.filelink)
1439 assert not os.path.exists(self.missing_link) 1629 assert not os.path.exists(self.missing_link)
1440 1630
1441 def tearDown(self): 1631 def tearDown(self):
1442 if os.path.exists(self.filelink): 1632 if os.path.exists(self.filelink):
1443 os.remove(self.filelink) 1633 os.remove(self.filelink)
1444 if os.path.exists(self.dirlink): 1634 if os.path.exists(self.dirlink):
1445 os.rmdir(self.dirlink) 1635 os.rmdir(self.dirlink)
1446 if os.path.lexists(self.missing_link): 1636 if os.path.lexists(self.missing_link):
1447 os.remove(self.missing_link) 1637 os.remove(self.missing_link)
1448 1638
1449 def test_directory_link(self): 1639 def test_directory_link(self):
1450 os.symlink(self.dirlink_target, self.dirlink, True) 1640 os.symlink(self.dirlink_target, self.dirlink)
1451 self.assertTrue(os.path.exists(self.dirlink)) 1641 self.assertTrue(os.path.exists(self.dirlink))
1452 self.assertTrue(os.path.isdir(self.dirlink)) 1642 self.assertTrue(os.path.isdir(self.dirlink))
1453 self.assertTrue(os.path.islink(self.dirlink)) 1643 self.assertTrue(os.path.islink(self.dirlink))
1454 self.check_stat(self.dirlink, self.dirlink_target) 1644 self.check_stat(self.dirlink, self.dirlink_target)
1455 1645
1456 def test_file_link(self): 1646 def test_file_link(self):
1457 os.symlink(self.filelink_target, self.filelink) 1647 os.symlink(self.filelink_target, self.filelink)
1458 self.assertTrue(os.path.exists(self.filelink)) 1648 self.assertTrue(os.path.exists(self.filelink))
1459 self.assertTrue(os.path.isfile(self.filelink)) 1649 self.assertTrue(os.path.isfile(self.filelink))
1460 self.assertTrue(os.path.islink(self.filelink)) 1650 self.assertTrue(os.path.islink(self.filelink))
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
1532 os.chdir(level3) 1722 os.chdir(level3)
1533 self.assertEqual(os.stat(file1), 1723 self.assertEqual(os.stat(file1),
1534 os.stat(os.path.relpath(link))) 1724 os.stat(os.path.relpath(link)))
1535 finally: 1725 finally:
1536 os.chdir(orig_dir) 1726 os.chdir(orig_dir)
1537 except OSError as err: 1727 except OSError as err:
1538 self.fail(err) 1728 self.fail(err)
1539 finally: 1729 finally:
1540 os.remove(file1) 1730 os.remove(file1)
1541 shutil.rmtree(level1) 1731 shutil.rmtree(level1)
1732
1733
1734 @support.skip_unless_symlink
1735 class NonLocalSymlinkTests(unittest.TestCase):
1736
1737 def setUp(self):
1738 """
1739 Create this structure:
1740
1741 base
1742 \___ some_dir
1743 """
1744 os.makedirs('base/some_dir')
1745
1746 def tearDown(self):
1747 shutil.rmtree('base')
1748
1749 def test_directory_link_nonlocal(self):
1750 """
1751 The symlink target should resolve relative to the link, not relative
1752 to the current directory.
1753
1754 Then, link base/some_link -> base/some_dir and ensure that some_link
1755 is resolved as a directory.
1756
1757 In issue13772, it was discovered that directory detection failed if
1758 the symlink target was not specified relative to the current
1759 directory, which was a defect in the implementation.
1760 """
1761 src = os.path.join('base', 'some_link')
1762 os.symlink('some_dir', src)
1763 assert os.path.isdir(src)
1542 1764
1543 1765
1544 class FSEncodingTests(unittest.TestCase): 1766 class FSEncodingTests(unittest.TestCase):
1545 def test_nop(self): 1767 def test_nop(self):
1546 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 1768 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1547 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 1769 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
1548 1770
1549 def test_identity(self): 1771 def test_identity(self):
1550 # assert fsdecode(fsencode(x)) == x 1772 # assert fsdecode(fsencode(x)) == x
1551 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 1773 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
1703 1925
1704 1926
1705 @unittest.skipUnless(threading is not None, "test needs threading module") 1927 @unittest.skipUnless(threading is not None, "test needs threading module")
1706 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 1928 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1707 class TestSendfile(unittest.TestCase): 1929 class TestSendfile(unittest.TestCase):
1708 1930
1709 DATA = b"12345abcde" * 16 * 1024 # 160 KB 1931 DATA = b"12345abcde" * 16 * 1024 # 160 KB
1710 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 1932 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
1711 not sys.platform.startswith("solaris") and \ 1933 not sys.platform.startswith("solaris") and \
1712 not sys.platform.startswith("sunos") 1934 not sys.platform.startswith("sunos")
1935 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1936 'requires headers and trailers support')
1713 1937
1714 @classmethod 1938 @classmethod
1715 def setUpClass(cls): 1939 def setUpClass(cls):
1716 with open(support.TESTFN, "wb") as f: 1940 with open(support.TESTFN, "wb") as f:
1717 f.write(cls.DATA) 1941 f.write(cls.DATA)
1718 1942
1719 @classmethod 1943 @classmethod
1720 def tearDownClass(cls): 1944 def tearDownClass(cls):
1721 support.unlink(support.TESTFN) 1945 support.unlink(support.TESTFN)
1722 1946
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
1821 data = self.server.handler_instance.get_data() 2045 data = self.server.handler_instance.get_data()
1822 self.assertEqual(data, b'') 2046 self.assertEqual(data, b'')
1823 2047
1824 def test_invalid_offset(self): 2048 def test_invalid_offset(self):
1825 with self.assertRaises(OSError) as cm: 2049 with self.assertRaises(OSError) as cm:
1826 os.sendfile(self.sockno, self.fileno, -1, 4096) 2050 os.sendfile(self.sockno, self.fileno, -1, 4096)
1827 self.assertEqual(cm.exception.errno, errno.EINVAL) 2051 self.assertEqual(cm.exception.errno, errno.EINVAL)
1828 2052
1829 # --- headers / trailers tests 2053 # --- headers / trailers tests
1830 2054
1831 if SUPPORT_HEADERS_TRAILERS: 2055 @requires_headers_trailers
1832 2056 def test_headers(self):
1833 def test_headers(self): 2057 total_sent = 0
1834 total_sent = 0 2058 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1835 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 2059 headers=[b"x" * 512])
1836 headers=[b"x" * 512]) 2060 total_sent += sent
2061 offset = 4096
2062 nbytes = 4096
2063 while 1:
2064 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2065 offset, nbytes)
2066 if sent == 0:
2067 break
1837 total_sent += sent 2068 total_sent += sent
1838 offset = 4096 2069 offset += sent
1839 nbytes = 4096 2070
1840 while 1: 2071 expected_data = b"x" * 512 + self.DATA
1841 sent = self.sendfile_wrapper(self.sockno, self.fileno, 2072 self.assertEqual(total_sent, len(expected_data))
1842 offset, nbytes) 2073 self.client.close()
1843 if sent == 0: 2074 self.server.wait()
1844 break 2075 data = self.server.handler_instance.get_data()
1845 total_sent += sent 2076 self.assertEqual(hash(data), hash(expected_data))
1846 offset += sent 2077
1847 2078 @requires_headers_trailers
1848 expected_data = b"x" * 512 + self.DATA 2079 def test_trailers(self):
1849 self.assertEqual(total_sent, len(expected_data)) 2080 TESTFN2 = support.TESTFN + "2"
2081 file_data = b"abcdef"
2082 with open(TESTFN2, 'wb') as f:
2083 f.write(file_data)
2084 with open(TESTFN2, 'rb')as f:
2085 self.addCleanup(os.remove, TESTFN2)
2086 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2087 trailers=[b"1234"])
1850 self.client.close() 2088 self.client.close()
1851 self.server.wait() 2089 self.server.wait()
1852 data = self.server.handler_instance.get_data() 2090 data = self.server.handler_instance.get_data()
1853 self.assertEqual(hash(data), hash(expected_data)) 2091 self.assertEqual(data, b"abcdef1234")
1854 2092
1855 def test_trailers(self): 2093 @requires_headers_trailers
1856 TESTFN2 = support.TESTFN + "2" 2094 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
1857 with open(TESTFN2, 'wb') as f: 2095 'test needs os.SF_NODISKIO')
1858 f.write(b"abcde") 2096 def test_flags(self):
1859 with open(TESTFN2, 'rb')as f: 2097 try:
1860 self.addCleanup(os.remove, TESTFN2) 2098 os.sendfile(self.sockno, self.fileno, 0, 4096,
1861 os.sendfile(self.sockno, f.fileno(), 0, 4096, 2099 flags=os.SF_NODISKIO)
1862 trailers=[b"12345"]) 2100 except OSError as err:
1863 self.client.close() 2101 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1864 self.server.wait() 2102 raise
1865 data = self.server.handler_instance.get_data()
1866 self.assertEqual(data, b"abcde12345")
1867
1868 if hasattr(os, "SF_NODISKIO"):
1869 def test_flags(self):
1870 try:
1871 os.sendfile(self.sockno, self.fileno, 0, 4096,
1872 flags=os.SF_NODISKIO)
1873 except OSError as err:
1874 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1875 raise
1876 2103
1877 2104
1878 def supports_extended_attributes(): 2105 def supports_extended_attributes():
1879 if not hasattr(os, "setxattr"): 2106 if not hasattr(os, "setxattr"):
1880 return False 2107 return False
1881 try: 2108 try:
1882 with open(support.TESTFN, "wb") as fp: 2109 with open(support.TESTFN, "wb") as fp:
1883 try: 2110 try:
1884 os.setxattr(fp.fileno(), b"user.test", b"") 2111 os.setxattr(fp.fileno(), b"user.test", b"")
1885 except OSError: 2112 except OSError:
(...skipping 254 matching lines...) Expand 10 before | Expand all | Expand 10 after
2140 2367
2141 for filenames, func, *func_args in funcs: 2368 for filenames, func, *func_args in funcs:
2142 for name in filenames: 2369 for name in filenames:
2143 try: 2370 try:
2144 func(name, *func_args) 2371 func(name, *func_args)
2145 except OSError as err: 2372 except OSError as err:
2146 self.assertIs(err.filename, name) 2373 self.assertIs(err.filename, name)
2147 else: 2374 else:
2148 self.fail("No exception thrown by {}".format(func)) 2375 self.fail("No exception thrown by {}".format(func))
2149 2376
2377 class CPUCountTests(unittest.TestCase):
2378 def test_cpu_count(self):
2379 cpus = os.cpu_count()
2380 if cpus is not None:
2381 self.assertIsInstance(cpus, int)
2382 self.assertGreater(cpus, 0)
2383 else:
2384 self.skipTest("Could not determine the number of CPUs")
2385
2386
2387 class FDInheritanceTests(unittest.TestCase):
2388 def test_get_set_inheritable(self):
2389 fd = os.open(__file__, os.O_RDONLY)
2390 self.addCleanup(os.close, fd)
2391 self.assertEqual(os.get_inheritable(fd), False)
2392
2393 os.set_inheritable(fd, True)
2394 self.assertEqual(os.get_inheritable(fd), True)
2395
2396 @unittest.skipIf(fcntl is None, "need fcntl")
2397 def test_get_inheritable_cloexec(self):
2398 fd = os.open(__file__, os.O_RDONLY)
2399 self.addCleanup(os.close, fd)
2400 self.assertEqual(os.get_inheritable(fd), False)
2401
2402 # clear FD_CLOEXEC flag
2403 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2404 flags &= ~fcntl.FD_CLOEXEC
2405 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
2406
2407 self.assertEqual(os.get_inheritable(fd), True)
2408
2409 @unittest.skipIf(fcntl is None, "need fcntl")
2410 def test_set_inheritable_cloexec(self):
2411 fd = os.open(__file__, os.O_RDONLY)
2412 self.addCleanup(os.close, fd)
2413 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2414 fcntl.FD_CLOEXEC)
2415
2416 os.set_inheritable(fd, True)
2417 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2418 0)
2419
2420 def test_open(self):
2421 fd = os.open(__file__, os.O_RDONLY)
2422 self.addCleanup(os.close, fd)
2423 self.assertEqual(os.get_inheritable(fd), False)
2424
2425 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2426 def test_pipe(self):
2427 rfd, wfd = os.pipe()
2428 self.addCleanup(os.close, rfd)
2429 self.addCleanup(os.close, wfd)
2430 self.assertEqual(os.get_inheritable(rfd), False)
2431 self.assertEqual(os.get_inheritable(wfd), False)
2432
2433 def test_dup(self):
2434 fd1 = os.open(__file__, os.O_RDONLY)
2435 self.addCleanup(os.close, fd1)
2436
2437 fd2 = os.dup(fd1)
2438 self.addCleanup(os.close, fd2)
2439 self.assertEqual(os.get_inheritable(fd2), False)
2440
2441 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2442 def test_dup2(self):
2443 fd = os.open(__file__, os.O_RDONLY)
2444 self.addCleanup(os.close, fd)
2445
2446 # inheritable by default
2447 fd2 = os.open(__file__, os.O_RDONLY)
2448 try:
2449 os.dup2(fd, fd2)
2450 self.assertEqual(os.get_inheritable(fd2), True)
2451 finally:
2452 os.close(fd2)
2453
2454 # force non-inheritable
2455 fd3 = os.open(__file__, os.O_RDONLY)
2456 try:
2457 os.dup2(fd, fd3, inheritable=False)
2458 self.assertEqual(os.get_inheritable(fd3), False)
2459 finally:
2460 os.close(fd3)
2461
2462 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2463 def test_openpty(self):
2464 master_fd, slave_fd = os.openpty()
2465 self.addCleanup(os.close, master_fd)
2466 self.addCleanup(os.close, slave_fd)
2467 self.assertEqual(os.get_inheritable(master_fd), False)
2468 self.assertEqual(os.get_inheritable(slave_fd), False)
2469
2470
2150 @support.reap_threads 2471 @support.reap_threads
2151 def test_main(): 2472 def test_main():
2152 support.run_unittest( 2473 support.run_unittest(
2153 FileTests, 2474 FileTests,
2154 StatAttributeTests, 2475 StatAttributeTests,
2155 EnvironTests, 2476 EnvironTests,
2156 WalkTests, 2477 WalkTests,
2157 FwalkTests, 2478 FwalkTests,
2158 MakedirTests, 2479 MakedirTests,
2159 DevNullTests, 2480 DevNullTests,
2160 URandomTests, 2481 URandomTests,
2161 ExecTests, 2482 ExecTests,
2162 Win32ErrorTests, 2483 Win32ErrorTests,
2163 TestInvalidFD, 2484 TestInvalidFD,
2164 PosixUidGidTests, 2485 PosixUidGidTests,
2165 Pep383Tests, 2486 Pep383Tests,
2166 Win32KillTests, 2487 Win32KillTests,
2488 Win32ListdirTests,
2167 Win32SymlinkTests, 2489 Win32SymlinkTests,
2490 NonLocalSymlinkTests,
2168 FSEncodingTests, 2491 FSEncodingTests,
2169 DeviceEncodingTests, 2492 DeviceEncodingTests,
2170 PidTests, 2493 PidTests,
2171 LoginTests, 2494 LoginTests,
2172 LinkTests, 2495 LinkTests,
2173 TestSendfile, 2496 TestSendfile,
2174 ProgramPriorityTests, 2497 ProgramPriorityTests,
2175 ExtendedAttributeTests, 2498 ExtendedAttributeTests,
2176 Win32DeprecatedBytesAPI, 2499 Win32DeprecatedBytesAPI,
2177 TermsizeTests, 2500 TermsizeTests,
2178 OSErrorTests, 2501 OSErrorTests,
2502 RemoveDirsTests,
2503 CPUCountTests,
2504 FDInheritanceTests,
2179 ) 2505 )
2180 2506
2181 if __name__ == "__main__": 2507 if __name__ == "__main__":
2182 test_main() 2508 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+