Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(18)

Delta Between Two Patch Sets: Lib/test/test_os.py

Issue 26826: Expose new copy_file_range() syscal in os module and use it to improve shutils.copy()
Left Patch Set: Created 3 years, 7 months ago
Right Patch Set: Created 3 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Doc/library/os.rst ('k') | Modules/posixmodule.c » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # As a test suite for the os module, this is woefully inadequate, but this 1 # As a test suite for the os module, this is woefully inadequate, but this
2 # does add tests for a few functions which have been determined to be more 2 # does add tests for a few functions which have been determined to be more
3 # portable than they had been thought to be. 3 # portable than they had been thought to be.
4 4
5 import asynchat 5 import asynchat
6 import asyncore 6 import asyncore
7 import codecs 7 import codecs
8 import contextlib 8 import contextlib
9 import decimal 9 import decimal
10 import errno 10 import errno
11 import fractions 11 import fractions
12 import getpass
12 import itertools 13 import itertools
13 import locale 14 import locale
14 import mmap 15 import mmap
15 import os 16 import os
16 import pickle 17 import pickle
17 import platform
18 import re 18 import re
19 import shutil 19 import shutil
20 import signal 20 import signal
21 import socket 21 import socket
22 import stat 22 import stat
23 import subprocess 23 import subprocess
24 import sys 24 import sys
25 import sysconfig 25 import sysconfig
26 import time 26 import time
27 import unittest 27 import unittest
28 import uuid 28 import uuid
29 import warnings 29 import warnings
30 from test import support 30 from test import support
31 try: 31 try:
32 import threading 32 import threading
33 except ImportError: 33 except ImportError:
34 threading = None 34 threading = None
35 try: 35 try:
36 import resource 36 import resource
37 except ImportError: 37 except ImportError:
38 resource = None 38 resource = None
39 try: 39 try:
40 import fcntl 40 import fcntl
41 except ImportError: 41 except ImportError:
42 fcntl = None 42 fcntl = None
43 43 try:
44 from test.script_helper import assert_python_ok 44 import _winapi
45 except ImportError:
46 _winapi = None
47 try:
48 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54 except ImportError:
55 groups = []
56 try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59 except ImportError:
60 all_users = []
61 try:
62 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
63 except ImportError:
64 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
65
66 from test.support.script_helper import assert_python_ok
67
68
69 root_in_posix = False
70 if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
45 72
46 # Detect whether we're on a Linux system that uses the (now outdated 73 # Detect whether we're on a Linux system that uses the (now outdated
47 # and unmaintained) linuxthreads threading library. There's an issue 74 # and unmaintained) linuxthreads threading library. There's an issue
48 # when combining linuxthreads with a failed execv call: see 75 # when combining linuxthreads with a failed execv call: see
49 # http://bugs.python.org/issue4970. 76 # http://bugs.python.org/issue4970.
50 if hasattr(sys, 'thread_info') and sys.thread_info.version: 77 if hasattr(sys, 'thread_info') and sys.thread_info.version:
51 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
52 else: 79 else:
53 USING_LINUXTHREADS = False 80 USING_LINUXTHREADS = False
54 81
55 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 82 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
56 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 83 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
85
86 @contextlib.contextmanager
87 def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
92 @contextlib.contextmanager
93 def bytes_filename_warn(expected):
94 msg = 'The Windows bytes API has been deprecated'
95 if os.name == 'nt':
96 with ignore_deprecation_warnings(msg, quiet=not expected):
97 yield
98 else:
99 yield
100
101
102 def create_file(filename, content=b'content'):
103 with open(filename, "xb", 0) as fp:
104 fp.write(content)
105
57 106
58 # Tests creating TESTFN 107 # Tests creating TESTFN
59 class FileTests(unittest.TestCase): 108 class FileTests(unittest.TestCase):
60 def setUp(self): 109 def setUp(self):
61 if os.path.lexists(support.TESTFN): 110 if os.path.lexists(support.TESTFN):
62 os.unlink(support.TESTFN) 111 os.unlink(support.TESTFN)
63 tearDown = setUp 112 tearDown = setUp
64 113
65 def test_access(self): 114 def test_access(self):
66 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 115 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 def test_read(self): 148 def test_read(self):
100 with open(support.TESTFN, "w+b") as fobj: 149 with open(support.TESTFN, "w+b") as fobj:
101 fobj.write(b"spam") 150 fobj.write(b"spam")
102 fobj.flush() 151 fobj.flush()
103 fd = fobj.fileno() 152 fd = fobj.fileno()
104 os.lseek(fd, 0, 0) 153 os.lseek(fd, 0, 0)
105 s = os.read(fd, 4) 154 s = os.read(fd, 4)
106 self.assertEqual(type(s), bytes) 155 self.assertEqual(type(s), bytes)
107 self.assertEqual(s, b"spam") 156 self.assertEqual(s, b"spam")
108 157
158 @support.cpython_only
159 # Skip the test on 32-bit platforms: the number of bytes must fit in a
160 # Py_ssize_t type
161 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
162 "needs INT_MAX < PY_SSIZE_T_MAX")
163 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
164 def test_large_read(self, size):
165 self.addCleanup(support.unlink, support.TESTFN)
166 create_file(support.TESTFN, b'test')
167
168 # Issue #21932: Make sure that os.read() does not raise an
169 # OverflowError for size larger than INT_MAX
170 with open(support.TESTFN, "rb") as fp:
171 data = os.read(fp.fileno(), size)
172
173 # The test does not try to read more than 2 GB at once because the
174 # operating system is free to return less bytes than requested.
175 self.assertEqual(data, b'test')
176
109 def test_write(self): 177 def test_write(self):
110 # os.write() accepts bytes- and buffer-like objects but not strings 178 # os.write() accepts bytes- and buffer-like objects but not strings
111 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) 179 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
112 self.assertRaises(TypeError, os.write, fd, "beans") 180 self.assertRaises(TypeError, os.write, fd, "beans")
113 os.write(fd, b"bacon\n") 181 os.write(fd, b"bacon\n")
114 os.write(fd, bytearray(b"eggs\n")) 182 os.write(fd, bytearray(b"eggs\n"))
115 os.write(fd, memoryview(b"spam\n")) 183 os.write(fd, memoryview(b"spam\n"))
116 os.close(fd) 184 os.close(fd)
117 with open(support.TESTFN, "rb") as fobj: 185 with open(support.TESTFN, "rb") as fobj:
118 self.assertEqual(fobj.read().splitlines(), 186 self.assertEqual(fobj.read().splitlines(),
(...skipping 26 matching lines...) Expand all
145 def test_fdopen(self): 213 def test_fdopen(self):
146 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 214 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
147 os.close(fd) 215 os.close(fd)
148 216
149 self.fdopen_helper() 217 self.fdopen_helper()
150 self.fdopen_helper('r') 218 self.fdopen_helper('r')
151 self.fdopen_helper('r', 100) 219 self.fdopen_helper('r', 100)
152 220
153 def test_replace(self): 221 def test_replace(self):
154 TESTFN2 = support.TESTFN + ".2" 222 TESTFN2 = support.TESTFN + ".2"
155 with open(support.TESTFN, 'w') as f: 223 self.addCleanup(support.unlink, support.TESTFN)
156 f.write("1") 224 self.addCleanup(support.unlink, TESTFN2)
157 with open(TESTFN2, 'w') as f: 225
158 f.write("2") 226 create_file(support.TESTFN, b"1")
159 self.addCleanup(os.unlink, TESTFN2) 227 create_file(TESTFN2, b"2")
228
160 os.replace(support.TESTFN, TESTFN2) 229 os.replace(support.TESTFN, TESTFN2)
161 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) 230 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
162 with open(TESTFN2, 'r') as f: 231 with open(TESTFN2, 'r') as f:
163 self.assertEqual(f.read(), "1") 232 self.assertEqual(f.read(), "1")
164 233
165 def test_open_keywords(self): 234 def test_open_keywords(self):
166 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 235 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
167 dir_fd=None) 236 dir_fd=None)
168 os.close(f) 237 os.close(f)
169 238
(...skipping 10 matching lines...) Expand all
180 TESTFN2 = support.TESTFN + ".3" 249 TESTFN2 = support.TESTFN + ".3"
181 data = b'0123456789' 250 data = b'0123456789'
182 251
183 create_file(support.TESTFN, data) 252 create_file(support.TESTFN, data)
184 self.addCleanup(support.unlink, support.TESTFN) 253 self.addCleanup(support.unlink, support.TESTFN)
185 254
186 in_file = open(support.TESTFN, 'rb') 255 in_file = open(support.TESTFN, 'rb')
187 self.addCleanup(in_file.close) 256 self.addCleanup(in_file.close)
188 in_fd = in_file.fileno() 257 in_fd = in_file.fileno()
189 258
190 out_file = open (TESTFN2, 'w+b') 259 out_file = open(TESTFN2, 'w+b')
191 self.addCleanup(support.unlink, TESTFN2) 260 self.addCleanup(support.unlink, TESTFN2)
192 self.addCleanup(out_file.close) 261 self.addCleanup(out_file.close)
193 out_fd = out_file.fileno() 262 out_fd = out_file.fileno()
194 263
195 try: 264 try:
196 i = os.copy_file_range(in_fd, out_fd, 5); 265 i = os.copy_file_range(in_fd, out_fd, 5)
197 except OSError as e: 266 except OSError as e:
267 # this is needed in case Python was compiled
268 # in a system with the syscall
269 # but the system in which is run does not
270 # which is kernel dependant, for the moment
198 if e.errno != errno.ENOSYS: 271 if e.errno != errno.ENOSYS:
199 raise 272 raise
200 self.skipTest(e) 273 self.skipTest(e)
201 else: 274 else:
202 self.assertIn(i, range(0, 6)); 275 self.assertIn(i, range(0, 6));
203 276
204 in_file = open(TESTFN2, 'rb') 277 in_file = open(TESTFN2, 'rb')
205 self.assertEqual(in_file.read(), data[:i]) 278 self.assertEqual(in_file.read(), data[:i])
206 279
280 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()')
281 def test_copy_file_range_off(self):
282 TESTFN4 = support.TESTFN + ".4"
283 # ,-- in_skip
284 # v v-- in_skip+bytes_to_copy
285 data = b'0123456789'
286 bytes_to_copy = 6
287 in_skip = 3
288 out_seek = 5
289
290 create_file(support.TESTFN, data)
291 self.addCleanup(support.unlink, support.TESTFN)
292
293 in_file = open(support.TESTFN, 'rb')
294 self.addCleanup(in_file.close)
295 in_fd = in_file.fileno()
296
297 out_file = open (TESTFN4, 'w+b')
298 self.addCleanup(support.unlink, TESTFN4)
299 self.addCleanup(out_file.close)
300 out_fd = out_file.fileno()
301
302 try:
303 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
304 in_skip, out_seek);
305 except OSError as e:
306 # this is needed in case Python was compiled
307 # in a system with the syscall
308 # but the system in which is run does not
309 # which is kernel dependant, for the moment
310 if e.errno != errno.ENOSYS:
311 raise
312 self.skipTest(e)
313 else:
314 self.assertIn(i, range(0, bytes_to_copy+1));
315
316 in_file = open(TESTFN4, 'rb')
317 read = in_file.read()
318 # seeked bytes (5) are zero'ed
319 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
320 # see diagram at the begining of this test
321 self.assertEqual(read[out_seek:],
322 data[in_skip:in_skip+bytes_to_copy])
323
207 324
208 # Test attributes on return values from os.*stat* family. 325 # Test attributes on return values from os.*stat* family.
209 class StatAttributeTests(unittest.TestCase): 326 class StatAttributeTests(unittest.TestCase):
210 def setUp(self): 327 def setUp(self):
211 os.mkdir(support.TESTFN) 328 self.fname = support.TESTFN
212 self.fname = os.path.join(support.TESTFN, "f1") 329 self.addCleanup(support.unlink, self.fname)
213 f = open(self.fname, 'wb') 330 create_file(self.fname, b"ABC")
214 f.write(b"ABC")
215 f.close()
216
217 def tearDown(self):
218 os.unlink(self.fname)
219 os.rmdir(support.TESTFN)
220 331
221 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 332 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
222 def check_stat_attributes(self, fname): 333 def check_stat_attributes(self, fname):
223 result = os.stat(fname) 334 result = os.stat(fname)
224 335
225 # Make sure direct access works 336 # Make sure direct access works
226 self.assertEqual(result[stat.ST_SIZE], 3) 337 self.assertEqual(result[stat.ST_SIZE], 3)
227 self.assertEqual(result.st_size, 3) 338 self.assertEqual(result.st_size, 3)
228 339
229 # Make sure all the attributes are there 340 # Make sure all the attributes are there
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
285 pass 396 pass
286 397
287 def test_stat_attributes(self): 398 def test_stat_attributes(self):
288 self.check_stat_attributes(self.fname) 399 self.check_stat_attributes(self.fname)
289 400
290 def test_stat_attributes_bytes(self): 401 def test_stat_attributes_bytes(self):
291 try: 402 try:
292 fname = self.fname.encode(sys.getfilesystemencoding()) 403 fname = self.fname.encode(sys.getfilesystemencoding())
293 except UnicodeEncodeError: 404 except UnicodeEncodeError:
294 self.skipTest("cannot encode %a for the filesystem" % self.fname) 405 self.skipTest("cannot encode %a for the filesystem" % self.fname)
295 with warnings.catch_warnings(): 406 with bytes_filename_warn(True):
296 warnings.simplefilter("ignore", DeprecationWarning)
297 self.check_stat_attributes(fname) 407 self.check_stat_attributes(fname)
298 408
299 def test_stat_result_pickle(self): 409 def test_stat_result_pickle(self):
300 result = os.stat(self.fname) 410 result = os.stat(self.fname)
301 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 411 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
302 p = pickle.dumps(result, proto) 412 p = pickle.dumps(result, proto)
303 self.assertIn(b'stat_result', p) 413 self.assertIn(b'stat_result', p)
304 if proto < 4: 414 if proto < 4:
305 self.assertIn(b'cos\nstat_result\n', p) 415 self.assertIn(b'cos\nstat_result\n', p)
306 unpickled = pickle.loads(p) 416 unpickled = pickle.loads(p)
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 r, w = os.pipe() 495 r, w = os.pipe()
386 try: 496 try:
387 os.stat(r) # should not raise error 497 os.stat(r) # should not raise error
388 finally: 498 finally:
389 os.close(r) 499 os.close(r)
390 os.close(w) 500 os.close(w)
391 with self.assertRaises(OSError) as ctx: 501 with self.assertRaises(OSError) as ctx:
392 os.stat(r) 502 os.stat(r)
393 self.assertEqual(ctx.exception.errno, errno.EBADF) 503 self.assertEqual(ctx.exception.errno, errno.EBADF)
394 504
505 def check_file_attributes(self, result):
506 self.assertTrue(hasattr(result, 'st_file_attributes'))
507 self.assertTrue(isinstance(result.st_file_attributes, int))
508 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
509
510 @unittest.skipUnless(sys.platform == "win32",
511 "st_file_attributes is Win32 specific")
512 def test_file_attributes(self):
513 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
514 result = os.stat(self.fname)
515 self.check_file_attributes(result)
516 self.assertEqual(
517 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
518 0)
519
520 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
521 dirname = support.TESTFN + "dir"
522 os.mkdir(dirname)
523 self.addCleanup(os.rmdir, dirname)
524
525 result = os.stat(dirname)
526 self.check_file_attributes(result)
527 self.assertEqual(
528 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
529 stat.FILE_ATTRIBUTE_DIRECTORY)
530
395 531
396 class UtimeTests(unittest.TestCase): 532 class UtimeTests(unittest.TestCase):
397 def setUp(self): 533 def setUp(self):
398 self.dirname = support.TESTFN 534 self.dirname = support.TESTFN
399 self.fname = os.path.join(self.dirname, "f1") 535 self.fname = os.path.join(self.dirname, "f1")
400 536
401 self.addCleanup(support.rmtree, self.dirname) 537 self.addCleanup(support.rmtree, self.dirname)
402 os.mkdir(self.dirname) 538 os.mkdir(self.dirname)
403 with open(self.fname, 'wb') as fp: 539 create_file(self.fname)
404 fp.write(b"ABC")
405 540
406 def restore_float_times(state): 541 def restore_float_times(state):
407 with warnings.catch_warnings(): 542 with ignore_deprecation_warnings('stat_float_times'):
408 warnings.simplefilter("ignore", DeprecationWarning)
409
410 os.stat_float_times(state) 543 os.stat_float_times(state)
411 544
412 # ensure that st_atime and st_mtime are float 545 # ensure that st_atime and st_mtime are float
413 with warnings.catch_warnings(): 546 with ignore_deprecation_warnings('stat_float_times'):
414 warnings.simplefilter("ignore", DeprecationWarning)
415
416 old_float_times = os.stat_float_times(-1) 547 old_float_times = os.stat_float_times(-1)
417 self.addCleanup(restore_float_times, old_float_times) 548 self.addCleanup(restore_float_times, old_float_times)
418 549
419 os.stat_float_times(True) 550 os.stat_float_times(True)
420 551
421 def support_subsecond(self, filename): 552 def support_subsecond(self, filename):
422 # Heuristic to check if the filesystem supports timestamp with 553 # Heuristic to check if the filesystem supports timestamp with
423 # subsecond resolution: check if float and int timestamps are different 554 # subsecond resolution: check if float and int timestamps are different
424 st = os.stat(filename) 555 st = os.stat(filename)
425 return ((st.st_atime != st[7]) 556 return ((st.st_atime != st[7])
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
497 def set_time(filename, ns): 628 def set_time(filename, ns):
498 # use follow_symlinks=False to test utimensat(timespec) 629 # use follow_symlinks=False to test utimensat(timespec)
499 # or lutimes(timeval) 630 # or lutimes(timeval)
500 os.utime(filename, ns=ns, follow_symlinks=False) 631 os.utime(filename, ns=ns, follow_symlinks=False)
501 self._test_utime(set_time) 632 self._test_utime(set_time)
502 633
503 @unittest.skipUnless(os.utime in os.supports_fd, 634 @unittest.skipUnless(os.utime in os.supports_fd,
504 "fd support for utime required for this test.") 635 "fd support for utime required for this test.")
505 def test_utime_fd(self): 636 def test_utime_fd(self):
506 def set_time(filename, ns): 637 def set_time(filename, ns):
507 with open(filename, 'wb') as fp: 638 with open(filename, 'wb', 0) as fp:
508 # use a file descriptor to test futimens(timespec) 639 # use a file descriptor to test futimens(timespec)
509 # or futimes(timeval) 640 # or futimes(timeval)
510 os.utime(fp.fileno(), ns=ns) 641 os.utime(fp.fileno(), ns=ns)
511 self._test_utime(set_time) 642 self._test_utime(set_time)
512 643
513 @unittest.skipUnless(os.utime in os.supports_dir_fd, 644 @unittest.skipUnless(os.utime in os.supports_dir_fd,
514 "dir_fd support for utime required for this test.") 645 "dir_fd support for utime required for this test.")
515 def test_utime_dir_fd(self): 646 def test_utime_dir_fd(self):
516 def set_time(filename, ns): 647 def set_time(filename, ns):
517 dirname, name = os.path.split(filename) 648 dirname, name = os.path.split(filename)
(...skipping 226 matching lines...) Expand 10 before | Expand all | Expand 10 after
744 del os.environ[missing] 875 del os.environ[missing]
745 self.assertIs(cm.exception.args[0], missing) 876 self.assertIs(cm.exception.args[0], missing)
746 self.assertTrue(cm.exception.__suppress_context__) 877 self.assertTrue(cm.exception.__suppress_context__)
747 878
748 879
749 class WalkTests(unittest.TestCase): 880 class WalkTests(unittest.TestCase):
750 """Tests for os.walk().""" 881 """Tests for os.walk()."""
751 882
752 # Wrapper to hide minor differences between os.walk and os.fwalk 883 # Wrapper to hide minor differences between os.walk and os.fwalk
753 # to tests both functions with the same code base 884 # to tests both functions with the same code base
754 def walk(self, directory, topdown=True, follow_symlinks=False): 885 def walk(self, top, **kwargs):
755 walk_it = os.walk(directory, 886 if 'follow_symlinks' in kwargs:
756 topdown=topdown, 887 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
757 followlinks=follow_symlinks) 888 return os.walk(top, **kwargs)
758 for root, dirs, files in walk_it:
759 yield (root, dirs, files)
760 889
761 def setUp(self): 890 def setUp(self):
762 join = os.path.join 891 join = os.path.join
892 self.addCleanup(support.rmtree, support.TESTFN)
763 893
764 # Build: 894 # Build:
765 # TESTFN/ 895 # TESTFN/
766 # TEST1/ a file kid and two directory kids 896 # TEST1/ a file kid and two directory kids
767 # tmp1 897 # tmp1
768 # SUB1/ a file kid and a directory kid 898 # SUB1/ a file kid and a directory kid
769 # tmp2 899 # tmp2
770 # SUB11/ no kids 900 # SUB11/ no kids
771 # SUB2/ a file kid and a dirsymlink kid 901 # SUB2/ a file kid and a dirsymlink kid
772 # tmp3 902 # tmp3
(...skipping 12 matching lines...) Expand all
785 t2_path = join(support.TESTFN, "TEST2") 915 t2_path = join(support.TESTFN, "TEST2")
786 tmp4_path = join(support.TESTFN, "TEST2", "tmp4") 916 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
787 broken_link_path = join(sub2_path, "broken_link") 917 broken_link_path = join(sub2_path, "broken_link")
788 918
789 # Create stuff. 919 # Create stuff.
790 os.makedirs(self.sub11_path) 920 os.makedirs(self.sub11_path)
791 os.makedirs(sub2_path) 921 os.makedirs(sub2_path)
792 os.makedirs(t2_path) 922 os.makedirs(t2_path)
793 923
794 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 924 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
795 f = open(path, "w") 925 with open(path, "x") as f:
796 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 926 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
797 f.close()
798 927
799 if support.can_symlink(): 928 if support.can_symlink():
800 os.symlink(os.path.abspath(t2_path), self.link_path) 929 os.symlink(os.path.abspath(t2_path), self.link_path)
801 os.symlink('broken', broken_link_path, True) 930 os.symlink('broken', broken_link_path, True)
802 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) 931 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
803 else: 932 else:
804 self.sub2_tree = (sub2_path, [], ["tmp3"]) 933 self.sub2_tree = (sub2_path, [], ["tmp3"])
805 934
806 def test_walk_topdown(self): 935 def test_walk_topdown(self):
807 # Walk top-down. 936 # Walk top-down.
808 all = list(os.walk(self.walk_path)) 937 all = list(self.walk(self.walk_path))
809 938
810 self.assertEqual(len(all), 4) 939 self.assertEqual(len(all), 4)
811 # We can't know which order SUB1 and SUB2 will appear in. 940 # We can't know which order SUB1 and SUB2 will appear in.
812 # Not flipped: TESTFN, SUB1, SUB11, SUB2 941 # Not flipped: TESTFN, SUB1, SUB11, SUB2
813 # flipped: TESTFN, SUB2, SUB1, SUB11 942 # flipped: TESTFN, SUB2, SUB1, SUB11
814 flipped = all[0][1][0] != "SUB1" 943 flipped = all[0][1][0] != "SUB1"
815 all[0][1].sort() 944 all[0][1].sort()
816 all[3 - 2 * flipped][-1].sort() 945 all[3 - 2 * flipped][-1].sort()
817 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 946 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
818 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]) ) 947 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]) )
(...skipping 14 matching lines...) Expand all
833 self.assertEqual(all[0], 962 self.assertEqual(all[0],
834 (self.walk_path, ["SUB2"], ["tmp1"])) 963 (self.walk_path, ["SUB2"], ["tmp1"]))
835 964
836 all[1][-1].sort() 965 all[1][-1].sort()
837 self.assertEqual(all[1], self.sub2_tree) 966 self.assertEqual(all[1], self.sub2_tree)
838 967
839 def test_walk_bottom_up(self): 968 def test_walk_bottom_up(self):
840 # Walk bottom-up. 969 # Walk bottom-up.
841 all = list(self.walk(self.walk_path, topdown=False)) 970 all = list(self.walk(self.walk_path, topdown=False))
842 971
843 self.assertEqual(len(all), 4) 972 self.assertEqual(len(all), 4, all)
844 # We can't know which order SUB1 and SUB2 will appear in. 973 # We can't know which order SUB1 and SUB2 will appear in.
845 # Not flipped: SUB11, SUB1, SUB2, TESTFN 974 # Not flipped: SUB11, SUB1, SUB2, TESTFN
846 # flipped: SUB2, SUB11, SUB1, TESTFN 975 # flipped: SUB2, SUB11, SUB1, TESTFN
847 flipped = all[3][1][0] != "SUB1" 976 flipped = all[3][1][0] != "SUB1"
848 all[3][1].sort() 977 all[3][1].sort()
849 all[2 - 2 * flipped][-1].sort() 978 all[2 - 2 * flipped][-1].sort()
850 self.assertEqual(all[3], 979 self.assertEqual(all[3],
851 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 980 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
852 self.assertEqual(all[flipped], 981 self.assertEqual(all[flipped],
853 (self.sub11_path, [], [])) 982 (self.sub11_path, [], []))
854 self.assertEqual(all[flipped + 1], 983 self.assertEqual(all[flipped + 1],
855 (self.sub1_path, ["SUB11"], ["tmp2"])) 984 (self.sub1_path, ["SUB11"], ["tmp2"]))
856 self.assertEqual(all[2 - 2 * flipped], 985 self.assertEqual(all[2 - 2 * flipped],
857 self.sub2_tree) 986 self.sub2_tree)
858 987
859 def test_walk_symlink(self): 988 def test_walk_symlink(self):
860 if not support.can_symlink(): 989 if not support.can_symlink():
861 self.skipTest("need symlink support") 990 self.skipTest("need symlink support")
862 991
863 # Walk, following symlinks. 992 # Walk, following symlinks.
864 walk_it = self.walk(self.walk_path, follow_symlinks=True) 993 walk_it = self.walk(self.walk_path, follow_symlinks=True)
865 for root, dirs, files in walk_it: 994 for root, dirs, files in walk_it:
866 if root == self.link_path: 995 if root == self.link_path:
867 self.assertEqual(dirs, []) 996 self.assertEqual(dirs, [])
868 self.assertEqual(files, ["tmp4"]) 997 self.assertEqual(files, ["tmp4"])
869 break 998 break
870 else: 999 else:
871 self.fail("Didn't follow symlink with followlinks=True") 1000 self.fail("Didn't follow symlink with followlinks=True")
872 1001
873 def tearDown(self): 1002 def test_walk_bad_dir(self):
874 # Tear everything down. This is a decent use for bottom-up on 1003 # Walk top-down.
875 # Windows, which doesn't have a recursive delete command. The 1004 errors = []
876 # (not so) subtlety is that rmdir will fail unless the dir's 1005 walk_it = self.walk(self.walk_path, onerror=errors.append)
877 # kids are removed first, so bottom up is essential. 1006 root, dirs, files = next(walk_it)
878 for root, dirs, files in os.walk(support.TESTFN, topdown=False): 1007 self.assertFalse(errors)
879 for name in files: 1008 dir1 = dirs[0]
880 os.remove(os.path.join(root, name)) 1009 dir1new = dir1 + '.new'
881 for name in dirs: 1010 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
882 dirname = os.path.join(root, name) 1011 roots = [r for r, d, f in walk_it]
883 if not os.path.islink(dirname): 1012 self.assertTrue(errors)
884 os.rmdir(dirname) 1013 self.assertNotIn(os.path.join(root, dir1), roots)
885 else: 1014 self.assertNotIn(os.path.join(root, dir1new), roots)
886 os.remove(dirname) 1015 for dir2 in dirs[1:]:
887 os.rmdir(support.TESTFN) 1016 self.assertIn(os.path.join(root, dir2), roots)
888 1017
889 1018
890 @unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1019 @unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
891 class FwalkTests(WalkTests): 1020 class FwalkTests(WalkTests):
892 """Tests for os.fwalk().""" 1021 """Tests for os.fwalk()."""
893 1022
894 def walk(self, directory, topdown=True, follow_symlinks=False): 1023 def walk(self, top, **kwargs):
895 walk_it = os.fwalk(directory, 1024 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
896 topdown=topdown,
897 follow_symlinks=follow_symlinks)
898 for root, dirs, files, root_fd in walk_it:
899 yield (root, dirs, files) 1025 yield (root, dirs, files)
900
901 1026
902 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1027 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
903 """ 1028 """
904 compare with walk() results. 1029 compare with walk() results.
905 """ 1030 """
906 walk_kwargs = walk_kwargs.copy() 1031 walk_kwargs = walk_kwargs.copy()
907 fwalk_kwargs = fwalk_kwargs.copy() 1032 fwalk_kwargs = fwalk_kwargs.copy()
908 for topdown, follow_symlinks in itertools.product((True, False), repeat= 2): 1033 for topdown, follow_symlinks in itertools.product((True, False), repeat= 2):
909 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1034 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
910 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks ) 1035 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks )
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
949 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1074 # yield EMFILE, and that the minimum allocated FD hasn't changed.
950 minfd = os.dup(1) 1075 minfd = os.dup(1)
951 os.close(minfd) 1076 os.close(minfd)
952 for i in range(256): 1077 for i in range(256):
953 for x in os.fwalk(support.TESTFN): 1078 for x in os.fwalk(support.TESTFN):
954 pass 1079 pass
955 newfd = os.dup(1) 1080 newfd = os.dup(1)
956 self.addCleanup(os.close, newfd) 1081 self.addCleanup(os.close, newfd)
957 self.assertEqual(newfd, minfd) 1082 self.assertEqual(newfd, minfd)
958 1083
1084 class BytesWalkTests(WalkTests):
1085 """Tests for os.walk() with bytes."""
1086 def setUp(self):
1087 super().setUp()
1088 self.stack = contextlib.ExitStack()
1089 if os.name == 'nt':
1090 self.stack.enter_context(bytes_filename_warn(False))
1091
959 def tearDown(self): 1092 def tearDown(self):
960 # cleanup 1093 self.stack.close()
961 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False) : 1094 super().tearDown()
962 for name in files: 1095
963 os.unlink(name, dir_fd=rootfd) 1096 def walk(self, top, **kwargs):
964 for name in dirs: 1097 if 'follow_symlinks' in kwargs:
965 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False) 1098 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
966 if stat.S_ISDIR(st.st_mode): 1099 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
967 os.rmdir(name, dir_fd=rootfd) 1100 root = os.fsdecode(broot)
968 else: 1101 dirs = list(map(os.fsdecode, bdirs))
969 os.unlink(name, dir_fd=rootfd) 1102 files = list(map(os.fsdecode, bfiles))
970 os.rmdir(support.TESTFN) 1103 yield (root, dirs, files)
1104 bdirs[:] = list(map(os.fsencode, dirs))
1105 bfiles[:] = list(map(os.fsencode, files))
971 1106
972 1107
973 class MakedirTests(unittest.TestCase): 1108 class MakedirTests(unittest.TestCase):
974 def setUp(self): 1109 def setUp(self):
975 os.mkdir(support.TESTFN) 1110 os.mkdir(support.TESTFN)
976 1111
977 def test_makedir(self): 1112 def test_makedir(self):
978 base = support.TESTFN 1113 base = support.TESTFN
979 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1114 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
980 os.makedirs(path) # Should work 1115 os.makedirs(path) # Should work
(...skipping 14 matching lines...) Expand all
995 old_mask = os.umask(0o022) 1130 old_mask = os.umask(0o022)
996 os.makedirs(path, mode) 1131 os.makedirs(path, mode)
997 self.assertRaises(OSError, os.makedirs, path, mode) 1132 self.assertRaises(OSError, os.makedirs, path, mode)
998 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1133 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
999 os.makedirs(path, 0o776, exist_ok=True) 1134 os.makedirs(path, 0o776, exist_ok=True)
1000 os.makedirs(path, mode=mode, exist_ok=True) 1135 os.makedirs(path, mode=mode, exist_ok=True)
1001 os.umask(old_mask) 1136 os.umask(old_mask)
1002 1137
1003 # Issue #25583: A drive root could raise PermissionError on Windows 1138 # Issue #25583: A drive root could raise PermissionError on Windows
1004 os.makedirs(os.path.abspath('/'), exist_ok=True) 1139 os.makedirs(os.path.abspath('/'), exist_ok=True)
1005
1006 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
1007 def test_chown_uid_gid_arguments_must_be_index(self):
1008 stat = os.stat(support.TESTFN)
1009 uid = stat.st_uid
1010 gid = stat.st_gid
1011 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)) :
1012 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1013 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1014 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1015 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1016 1140
1017 def test_exist_ok_s_isgid_directory(self): 1141 def test_exist_ok_s_isgid_directory(self):
1018 path = os.path.join(support.TESTFN, 'dir1') 1142 path = os.path.join(support.TESTFN, 'dir1')
1019 S_ISGID = stat.S_ISGID 1143 S_ISGID = stat.S_ISGID
1020 mode = 0o777 1144 mode = 0o777
1021 old_mask = os.umask(0o022) 1145 old_mask = os.umask(0o022)
1022 try: 1146 try:
1023 existing_testfn_mode = stat.S_IMODE( 1147 existing_testfn_mode = stat.S_IMODE(
1024 os.lstat(support.TESTFN).st_mode) 1148 os.lstat(support.TESTFN).st_mode)
1025 try: 1149 try:
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1057 'dir4', 'dir5', 'dir6') 1181 'dir4', 'dir5', 'dir6')
1058 # If the tests failed, the bottom-most directory ('../dir6') 1182 # If the tests failed, the bottom-most directory ('../dir6')
1059 # may not have been created, so we look for the outermost directory 1183 # may not have been created, so we look for the outermost directory
1060 # that exists. 1184 # that exists.
1061 while not os.path.exists(path) and path != support.TESTFN: 1185 while not os.path.exists(path) and path != support.TESTFN:
1062 path = os.path.dirname(path) 1186 path = os.path.dirname(path)
1063 1187
1064 os.removedirs(path) 1188 os.removedirs(path)
1065 1189
1066 1190
1191 @unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1192 class ChownFileTests(unittest.TestCase):
1193
1194 @classmethod
1195 def setUpClass(cls):
1196 os.mkdir(support.TESTFN)
1197
1198 def test_chown_uid_gid_arguments_must_be_index(self):
1199 stat = os.stat(support.TESTFN)
1200 uid = stat.st_uid
1201 gid = stat.st_gid
1202 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)) :
1203 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1204 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1205 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1206 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1207
1208 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1209 def test_chown(self):
1210 gid_1, gid_2 = groups[:2]
1211 uid = os.stat(support.TESTFN).st_uid
1212 os.chown(support.TESTFN, uid, gid_1)
1213 gid = os.stat(support.TESTFN).st_gid
1214 self.assertEqual(gid, gid_1)
1215 os.chown(support.TESTFN, uid, gid_2)
1216 gid = os.stat(support.TESTFN).st_gid
1217 self.assertEqual(gid, gid_2)
1218
1219 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1220 "test needs root privilege and more than one user")
1221 def test_chown_with_root(self):
1222 uid_1, uid_2 = all_users[:2]
1223 gid = os.stat(support.TESTFN).st_gid
1224 os.chown(support.TESTFN, uid_1, gid)
1225 uid = os.stat(support.TESTFN).st_uid
1226 self.assertEqual(uid, uid_1)
1227 os.chown(support.TESTFN, uid_2, gid)
1228 uid = os.stat(support.TESTFN).st_uid
1229 self.assertEqual(uid, uid_2)
1230
1231 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1232 "test needs non-root account and more than one user")
1233 def test_chown_without_permission(self):
1234 uid_1, uid_2 = all_users[:2]
1235 gid = os.stat(support.TESTFN).st_gid
1236 with self.assertRaises(PermissionError):
1237 os.chown(support.TESTFN, uid_1, gid)
1238 os.chown(support.TESTFN, uid_2, gid)
1239
1240 @classmethod
1241 def tearDownClass(cls):
1242 os.rmdir(support.TESTFN)
1243
1244
1067 class RemoveDirsTests(unittest.TestCase): 1245 class RemoveDirsTests(unittest.TestCase):
1068 def setUp(self): 1246 def setUp(self):
1069 os.makedirs(support.TESTFN) 1247 os.makedirs(support.TESTFN)
1070 1248
1071 def tearDown(self): 1249 def tearDown(self):
1072 support.rmtree(support.TESTFN) 1250 support.rmtree(support.TESTFN)
1073 1251
1074 def test_remove_all(self): 1252 def test_remove_all(self):
1075 dira = os.path.join(support.TESTFN, 'dira') 1253 dira = os.path.join(support.TESTFN, 'dira')
1076 os.mkdir(dira) 1254 os.mkdir(dira)
1077 dirb = os.path.join(dira, 'dirb') 1255 dirb = os.path.join(dira, 'dirb')
1078 os.mkdir(dirb) 1256 os.mkdir(dirb)
1079 os.removedirs(dirb) 1257 os.removedirs(dirb)
1080 self.assertFalse(os.path.exists(dirb)) 1258 self.assertFalse(os.path.exists(dirb))
1081 self.assertFalse(os.path.exists(dira)) 1259 self.assertFalse(os.path.exists(dira))
1082 self.assertFalse(os.path.exists(support.TESTFN)) 1260 self.assertFalse(os.path.exists(support.TESTFN))
1083 1261
1084 def test_remove_partial(self): 1262 def test_remove_partial(self):
1085 dira = os.path.join(support.TESTFN, 'dira') 1263 dira = os.path.join(support.TESTFN, 'dira')
1086 os.mkdir(dira) 1264 os.mkdir(dira)
1087 dirb = os.path.join(dira, 'dirb') 1265 dirb = os.path.join(dira, 'dirb')
1088 os.mkdir(dirb) 1266 os.mkdir(dirb)
1089 with open(os.path.join(dira, 'file.txt'), 'w') as f: 1267 create_file(os.path.join(dira, 'file.txt'))
1090 f.write('text')
1091 os.removedirs(dirb) 1268 os.removedirs(dirb)
1092 self.assertFalse(os.path.exists(dirb)) 1269 self.assertFalse(os.path.exists(dirb))
1093 self.assertTrue(os.path.exists(dira)) 1270 self.assertTrue(os.path.exists(dira))
1094 self.assertTrue(os.path.exists(support.TESTFN)) 1271 self.assertTrue(os.path.exists(support.TESTFN))
1095 1272
1096 def test_remove_nothing(self): 1273 def test_remove_nothing(self):
1097 dira = os.path.join(support.TESTFN, 'dira') 1274 dira = os.path.join(support.TESTFN, 'dira')
1098 os.mkdir(dira) 1275 os.mkdir(dira)
1099 dirb = os.path.join(dira, 'dirb') 1276 dirb = os.path.join(dira, 'dirb')
1100 os.mkdir(dirb) 1277 os.mkdir(dirb)
1101 with open(os.path.join(dirb, 'file.txt'), 'w') as f: 1278 create_file(os.path.join(dirb, 'file.txt'))
1102 f.write('text')
1103 with self.assertRaises(OSError): 1279 with self.assertRaises(OSError):
1104 os.removedirs(dirb) 1280 os.removedirs(dirb)
1105 self.assertTrue(os.path.exists(dirb)) 1281 self.assertTrue(os.path.exists(dirb))
1106 self.assertTrue(os.path.exists(dira)) 1282 self.assertTrue(os.path.exists(dira))
1107 self.assertTrue(os.path.exists(support.TESTFN)) 1283 self.assertTrue(os.path.exists(support.TESTFN))
1108 1284
1109 1285
1110 class DevNullTests(unittest.TestCase): 1286 class DevNullTests(unittest.TestCase):
1111 def test_devnull(self): 1287 def test_devnull(self):
1112 with open(os.devnull, 'wb') as f: 1288 with open(os.devnull, 'wb', 0) as f:
1113 f.write(b'hello') 1289 f.write(b'hello')
1114 f.close() 1290 f.close()
1115 with open(os.devnull, 'rb') as f: 1291 with open(os.devnull, 'rb') as f:
1116 self.assertEqual(f.read(), b'') 1292 self.assertEqual(f.read(), b'')
1117 1293
1118 1294
1119 class URandomTests(unittest.TestCase): 1295 class URandomTests(unittest.TestCase):
1120 def test_urandom_length(self): 1296 def test_urandom_length(self):
1121 self.assertEqual(len(os.urandom(0)), 0) 1297 self.assertEqual(len(os.urandom(0)), 0)
1122 self.assertEqual(len(os.urandom(1)), 1) 1298 self.assertEqual(len(os.urandom(1)), 1)
(...skipping 16 matching lines...) Expand all
1139 stdout = out[1] 1315 stdout = out[1]
1140 self.assertEqual(len(stdout), 16) 1316 self.assertEqual(len(stdout), 16)
1141 return stdout 1317 return stdout
1142 1318
1143 def test_urandom_subprocess(self): 1319 def test_urandom_subprocess(self):
1144 data1 = self.get_urandom_subprocess(16) 1320 data1 = self.get_urandom_subprocess(16)
1145 data2 = self.get_urandom_subprocess(16) 1321 data2 = self.get_urandom_subprocess(16)
1146 self.assertNotEqual(data1, data2) 1322 self.assertNotEqual(data1, data2)
1147 1323
1148 1324
1149 HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) 1325 # os.urandom() doesn't use a file descriptor when it is implemented with the
1150 1326 # getentropy() function, the getrandom() function or the getrandom() syscall
1151 @unittest.skipIf(HAVE_GETENTROPY, 1327 OS_URANDOM_DONT_USE_FD = (
1152 "getentropy() does not use a file descriptor") 1328 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1329 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1330 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1331
1332 @unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1333 "os.random() does not use a file descriptor")
1153 class URandomFDTests(unittest.TestCase): 1334 class URandomFDTests(unittest.TestCase):
1154 @unittest.skipUnless(resource, "test requires the resource module") 1335 @unittest.skipUnless(resource, "test requires the resource module")
1155 def test_urandom_failure(self): 1336 def test_urandom_failure(self):
1156 # Check urandom() failing when it is not able to open /dev/random. 1337 # Check urandom() failing when it is not able to open /dev/random.
1157 # We spawn a new process to make the test more robust (if getrlimit() 1338 # We spawn a new process to make the test more robust (if getrlimit()
1158 # failed to restore the file descriptor limit after this, the whole 1339 # failed to restore the file descriptor limit after this, the whole
1159 # test suite would crash; this actually happened on the OS X Tiger 1340 # test suite would crash; this actually happened on the OS X Tiger
1160 # buildbot). 1341 # buildbot).
1161 code = """if 1: 1342 code = """if 1:
1162 import errno 1343 import errno
(...skipping 10 matching lines...) Expand all
1173 raise AssertionError("OSError not raised") 1354 raise AssertionError("OSError not raised")
1174 """ 1355 """
1175 assert_python_ok('-c', code) 1356 assert_python_ok('-c', code)
1176 1357
1177 def test_urandom_fd_closed(self): 1358 def test_urandom_fd_closed(self):
1178 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1359 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1179 # closed. 1360 # closed.
1180 code = """if 1: 1361 code = """if 1:
1181 import os 1362 import os
1182 import sys 1363 import sys
1364 import test.support
1183 os.urandom(4) 1365 os.urandom(4)
1184 os.closerange(3, 256) 1366 with test.support.SuppressCrashReport():
1367 os.closerange(3, 256)
1185 sys.stdout.buffer.write(os.urandom(4)) 1368 sys.stdout.buffer.write(os.urandom(4))
1186 """ 1369 """
1187 rc, out, err = assert_python_ok('-Sc', code) 1370 rc, out, err = assert_python_ok('-Sc', code)
1188 1371
1189 def test_urandom_fd_reopened(self): 1372 def test_urandom_fd_reopened(self):
1190 # Issue #21207: urandom() should detect its fd to /dev/urandom 1373 # Issue #21207: urandom() should detect its fd to /dev/urandom
1191 # changed to something else, and reopen it. 1374 # changed to something else, and reopen it.
1192 with open(support.TESTFN, 'wb') as f: 1375 self.addCleanup(support.unlink, support.TESTFN)
1193 f.write(b"x" * 256) 1376 create_file(support.TESTFN, b"x" * 256)
1194 self.addCleanup(os.unlink, support.TESTFN) 1377
1195 code = """if 1: 1378 code = """if 1:
1196 import os 1379 import os
1197 import sys 1380 import sys
1381 import test.support
1198 os.urandom(4) 1382 os.urandom(4)
1199 for fd in range(3, 256): 1383 with test.support.SuppressCrashReport():
1200 try: 1384 for fd in range(3, 256):
1201 os.close(fd) 1385 try:
1202 except OSError: 1386 os.close(fd)
1203 pass 1387 except OSError:
1204 else: 1388 pass
1205 # Found the urandom fd (XXX hopefully) 1389 else:
1206 break 1390 # Found the urandom fd (XXX hopefully)
1207 os.closerange(3, 256) 1391 break
1392 os.closerange(3, 256)
1208 with open({TESTFN!r}, 'rb') as f: 1393 with open({TESTFN!r}, 'rb') as f:
1209 os.dup2(f.fileno(), fd) 1394 os.dup2(f.fileno(), fd)
1210 sys.stdout.buffer.write(os.urandom(4)) 1395 sys.stdout.buffer.write(os.urandom(4))
1211 sys.stdout.buffer.write(os.urandom(4)) 1396 sys.stdout.buffer.write(os.urandom(4))
1212 """.format(TESTFN=support.TESTFN) 1397 """.format(TESTFN=support.TESTFN)
1213 rc, out, err = assert_python_ok('-Sc', code) 1398 rc, out, err = assert_python_ok('-Sc', code)
1214 self.assertEqual(len(out), 8) 1399 self.assertEqual(len(out), 8)
1215 self.assertNotEqual(out[0:4], out[4:8]) 1400 self.assertNotEqual(out[0:4], out[4:8])
1216 rc, out2, err2 = assert_python_ok('-Sc', code) 1401 rc, out2, err2 = assert_python_ok('-Sc', code)
1217 self.assertEqual(len(out2), 8) 1402 self.assertEqual(len(out2), 8)
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
1311 ('execve', native_fullpath, (arguments, env_path))) 1496 ('execve', native_fullpath, (arguments, env_path)))
1312 1497
1313 def test_internal_execvpe_str(self): 1498 def test_internal_execvpe_str(self):
1314 self._test_internal_execvpe(str) 1499 self._test_internal_execvpe(str)
1315 if os.name != "nt": 1500 if os.name != "nt":
1316 self._test_internal_execvpe(bytes) 1501 self._test_internal_execvpe(bytes)
1317 1502
1318 1503
1319 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1504 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1320 class Win32ErrorTests(unittest.TestCase): 1505 class Win32ErrorTests(unittest.TestCase):
1506 def setUp(self):
1507 try:
1508 os.stat(support.TESTFN)
1509 except FileNotFoundError:
1510 exists = False
1511 except OSError as exc:
1512 exists = True
1513 self.fail("file %s must not exist; os.stat failed with %s"
1514 % (support.TESTFN, exc))
1515 else:
1516 self.fail("file %s must not exist" % support.TESTFN)
1517
1321 def test_rename(self): 1518 def test_rename(self):
1322 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b ak") 1519 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".b ak")
1323 1520
1324 def test_remove(self): 1521 def test_remove(self):
1325 self.assertRaises(OSError, os.remove, support.TESTFN) 1522 self.assertRaises(OSError, os.remove, support.TESTFN)
1326 1523
1327 def test_chdir(self): 1524 def test_chdir(self):
1328 self.assertRaises(OSError, os.chdir, support.TESTFN) 1525 self.assertRaises(OSError, os.chdir, support.TESTFN)
1329 1526
1330 def test_mkdir(self): 1527 def test_mkdir(self):
1331 f = open(support.TESTFN, "w") 1528 self.addCleanup(support.unlink, support.TESTFN)
1332 try: 1529
1530 with open(support.TESTFN, "x") as f:
1333 self.assertRaises(OSError, os.mkdir, support.TESTFN) 1531 self.assertRaises(OSError, os.mkdir, support.TESTFN)
1334 finally:
1335 f.close()
1336 os.unlink(support.TESTFN)
1337 1532
1338 def test_utime(self): 1533 def test_utime(self):
1339 self.assertRaises(OSError, os.utime, support.TESTFN, None) 1534 self.assertRaises(OSError, os.utime, support.TESTFN, None)
1340 1535
1341 def test_chmod(self): 1536 def test_chmod(self):
1342 self.assertRaises(OSError, os.chmod, support.TESTFN, 0) 1537 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1538
1343 1539
1344 class TestInvalidFD(unittest.TestCase): 1540 class TestInvalidFD(unittest.TestCase):
1345 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1541 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1346 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1542 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1347 #singles.append("close") 1543 #singles.append("close")
1348 #We omit close because it doesn'r raise an exception on some platforms 1544 #We omit close because it doesn'r raise an exception on some platforms
1349 def get_single(f): 1545 def get_single(f):
1350 def helper(self): 1546 def helper(self):
1351 if hasattr(os, f): 1547 if hasattr(os, f):
1352 self.check(getattr(os, f)) 1548 self.check(getattr(os, f))
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
1423 self.check(os.tcsetpgrp, 0) 1619 self.check(os.tcsetpgrp, 0)
1424 1620
1425 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 1621 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1426 def test_write(self): 1622 def test_write(self):
1427 self.check(os.write, b" ") 1623 self.check(os.write, b" ")
1428 1624
1429 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 1625 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1430 def test_writev(self): 1626 def test_writev(self):
1431 self.check(os.writev, [b'abc']) 1627 self.check(os.writev, [b'abc'])
1432 1628
1433 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()') 1629 def test_inheritable(self):
1434 def test_copy_file_range_in(self): 1630 self.check(os.get_inheritable)
1435 try: 1631 self.check(os.set_inheritable, True)
1436 self.check(os.copy_file_range, 0, 0) 1632
1437 except OSError as e: 1633 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1438 if e.errno != errno.ENOSYS: 1634 'needs os.get_blocking() and os.set_blocking()')
1439 raise 1635 def test_blocking(self):
1440 self.skipTest(e) 1636 self.check(os.get_blocking)
1637 self.check(os.set_blocking, True)
1638
1441 1639
1442 class LinkTests(unittest.TestCase): 1640 class LinkTests(unittest.TestCase):
1443 def setUp(self): 1641 def setUp(self):
1444 self.file1 = support.TESTFN 1642 self.file1 = support.TESTFN
1445 self.file2 = os.path.join(support.TESTFN + "2") 1643 self.file2 = os.path.join(support.TESTFN + "2")
1446 1644
1447 def tearDown(self): 1645 def tearDown(self):
1448 for file in (self.file1, self.file2): 1646 for file in (self.file1, self.file2):
1449 if os.path.exists(file): 1647 if os.path.exists(file):
1450 os.unlink(file) 1648 os.unlink(file)
1451 1649
1452 def _test_link(self, file1, file2): 1650 def _test_link(self, file1, file2):
1453 with open(file1, "w") as f1: 1651 create_file(file1)
1454 f1.write("test") 1652
1455 1653 with bytes_filename_warn(False):
1456 with warnings.catch_warnings():
1457 warnings.simplefilter("ignore", DeprecationWarning)
1458 os.link(file1, file2) 1654 os.link(file1, file2)
1459 with open(file1, "r") as f1, open(file2, "r") as f2: 1655 with open(file1, "r") as f1, open(file2, "r") as f2:
1460 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 1656 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1461 1657
1462 def test_link(self): 1658 def test_link(self):
1463 self._test_link(self.file1, self.file2) 1659 self._test_link(self.file1, self.file2)
1464 1660
1465 def test_link_bytes(self): 1661 def test_link_bytes(self):
1466 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 1662 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1467 bytes(self.file2, sys.getfilesystemencoding())) 1663 bytes(self.file2, sys.getfilesystemencoding()))
(...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after
1739 1935
1740 def tearDown(self): 1936 def tearDown(self):
1741 shutil.rmtree(support.TESTFN) 1937 shutil.rmtree(support.TESTFN)
1742 1938
1743 def test_listdir_no_extended_path(self): 1939 def test_listdir_no_extended_path(self):
1744 """Test when the path is not an "extended" path.""" 1940 """Test when the path is not an "extended" path."""
1745 # unicode 1941 # unicode
1746 self.assertEqual( 1942 self.assertEqual(
1747 sorted(os.listdir(support.TESTFN)), 1943 sorted(os.listdir(support.TESTFN)),
1748 self.created_paths) 1944 self.created_paths)
1945
1749 # bytes 1946 # bytes
1750 self.assertEqual( 1947 with bytes_filename_warn(False):
1751 sorted(os.listdir(os.fsencode(support.TESTFN))), 1948 self.assertEqual(
1752 [os.fsencode(path) for path in self.created_paths]) 1949 sorted(os.listdir(os.fsencode(support.TESTFN))),
1950 [os.fsencode(path) for path in self.created_paths])
1753 1951
1754 def test_listdir_extended_path(self): 1952 def test_listdir_extended_path(self):
1755 """Test when the path starts with '\\\\?\\'.""" 1953 """Test when the path starts with '\\\\?\\'."""
1756 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247( v=vs.85).aspx#maxpath 1954 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247( v=vs.85).aspx#maxpath
1757 # unicode 1955 # unicode
1758 path = '\\\\?\\' + os.path.abspath(support.TESTFN) 1956 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1759 self.assertEqual( 1957 self.assertEqual(
1760 sorted(os.listdir(path)), 1958 sorted(os.listdir(path)),
1761 self.created_paths) 1959 self.created_paths)
1960
1762 # bytes 1961 # bytes
1763 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) 1962 with bytes_filename_warn(False):
1764 self.assertEqual( 1963 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1765 sorted(os.listdir(path)), 1964 self.assertEqual(
1766 [os.fsencode(path) for path in self.created_paths]) 1965 sorted(os.listdir(path)),
1966 [os.fsencode(path) for path in self.created_paths])
1767 1967
1768 1968
1769 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1969 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1770 @support.skip_unless_symlink 1970 @support.skip_unless_symlink
1771 class Win32SymlinkTests(unittest.TestCase): 1971 class Win32SymlinkTests(unittest.TestCase):
1772 filelink = 'filelinktest' 1972 filelink = 'filelinktest'
1773 filelink_target = os.path.abspath(__file__) 1973 filelink_target = os.path.abspath(__file__)
1774 dirlink = 'dirlinktest' 1974 dirlink = 'dirlinktest'
1775 dirlink_target = os.path.dirname(filelink_target) 1975 dirlink_target = os.path.dirname(filelink_target)
1776 missing_link = 'missing link' 1976 missing_link = 'missing link'
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
1831 def test_rmdir_on_directory_link_to_missing_target(self): 2031 def test_rmdir_on_directory_link_to_missing_target(self):
1832 self._create_missing_dir_link() 2032 self._create_missing_dir_link()
1833 # consider allowing rmdir to remove directory links 2033 # consider allowing rmdir to remove directory links
1834 os.rmdir(self.missing_link) 2034 os.rmdir(self.missing_link)
1835 2035
1836 def check_stat(self, link, target): 2036 def check_stat(self, link, target):
1837 self.assertEqual(os.stat(link), os.stat(target)) 2037 self.assertEqual(os.stat(link), os.stat(target))
1838 self.assertNotEqual(os.lstat(link), os.stat(link)) 2038 self.assertNotEqual(os.lstat(link), os.stat(link))
1839 2039
1840 bytes_link = os.fsencode(link) 2040 bytes_link = os.fsencode(link)
1841 with warnings.catch_warnings(): 2041 with bytes_filename_warn(True):
1842 warnings.simplefilter("ignore", DeprecationWarning)
1843 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2042 self.assertEqual(os.stat(bytes_link), os.stat(target))
2043 with bytes_filename_warn(True):
1844 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2044 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1845 2045
1846 def test_12084(self): 2046 def test_12084(self):
1847 level1 = os.path.abspath(support.TESTFN) 2047 level1 = os.path.abspath(support.TESTFN)
1848 level2 = os.path.join(level1, "level2") 2048 level2 = os.path.join(level1, "level2")
1849 level3 = os.path.join(level2, "level3") 2049 level3 = os.path.join(level2, "level3")
1850 try: 2050 self.addCleanup(support.rmtree, level1)
1851 os.mkdir(level1) 2051
1852 os.mkdir(level2) 2052 os.mkdir(level1)
1853 os.mkdir(level3) 2053 os.mkdir(level2)
1854 2054 os.mkdir(level3)
1855 file1 = os.path.abspath(os.path.join(level1, "file1")) 2055
1856 2056 file1 = os.path.abspath(os.path.join(level1, "file1"))
1857 with open(file1, "w") as f: 2057 create_file(file1)
1858 f.write("file1") 2058
1859 2059 orig_dir = os.getcwd()
1860 orig_dir = os.getcwd() 2060 try:
1861 try: 2061 os.chdir(level2)
1862 os.chdir(level2) 2062 link = os.path.join(level2, "link")
1863 link = os.path.join(level2, "link") 2063 os.symlink(os.path.relpath(file1), "link")
1864 os.symlink(os.path.relpath(file1), "link") 2064 self.assertIn("link", os.listdir(os.getcwd()))
1865 self.assertIn("link", os.listdir(os.getcwd())) 2065
1866 2066 # Check os.stat calls from the same dir as the link
1867 # Check os.stat calls from the same dir as the link 2067 self.assertEqual(os.stat(file1), os.stat("link"))
1868 self.assertEqual(os.stat(file1), os.stat("link")) 2068
1869 2069 # Check os.stat calls from a dir below the link
1870 # Check os.stat calls from a dir below the link 2070 os.chdir(level1)
1871 os.chdir(level1) 2071 self.assertEqual(os.stat(file1),
1872 self.assertEqual(os.stat(file1), 2072 os.stat(os.path.relpath(link)))
1873 os.stat(os.path.relpath(link))) 2073
1874 2074 # Check os.stat calls from a dir above the link
1875 # Check os.stat calls from a dir above the link 2075 os.chdir(level3)
1876 os.chdir(level3) 2076 self.assertEqual(os.stat(file1),
1877 self.assertEqual(os.stat(file1), 2077 os.stat(os.path.relpath(link)))
1878 os.stat(os.path.relpath(link)))
1879 finally:
1880 os.chdir(orig_dir)
1881 except OSError as err:
1882 self.fail(err)
1883 finally: 2078 finally:
1884 os.remove(file1) 2079 os.chdir(orig_dir)
1885 shutil.rmtree(level1) 2080
2081
2082 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2083 class Win32JunctionTests(unittest.TestCase):
2084 junction = 'junctiontest'
2085 junction_target = os.path.dirname(os.path.abspath(__file__))
2086
2087 def setUp(self):
2088 assert os.path.exists(self.junction_target)
2089 assert not os.path.exists(self.junction)
2090
2091 def tearDown(self):
2092 if os.path.exists(self.junction):
2093 # os.rmdir delegates to Windows' RemoveDirectoryW,
2094 # which removes junction points safely.
2095 os.rmdir(self.junction)
2096
2097 def test_create_junction(self):
2098 _winapi.CreateJunction(self.junction_target, self.junction)
2099 self.assertTrue(os.path.exists(self.junction))
2100 self.assertTrue(os.path.isdir(self.junction))
2101
2102 # Junctions are not recognized as links.
2103 self.assertFalse(os.path.islink(self.junction))
2104
2105 def test_unlink_removes_junction(self):
2106 _winapi.CreateJunction(self.junction_target, self.junction)
2107 self.assertTrue(os.path.exists(self.junction))
2108
2109 os.unlink(self.junction)
2110 self.assertFalse(os.path.exists(self.junction))
1886 2111
1887 2112
1888 @support.skip_unless_symlink 2113 @support.skip_unless_symlink
1889 class NonLocalSymlinkTests(unittest.TestCase): 2114 class NonLocalSymlinkTests(unittest.TestCase):
1890 2115
1891 def setUp(self): 2116 def setUp(self):
1892 """ 2117 """
1893 Create this structure: 2118 Create this structure:
1894 2119
1895 base 2120 base
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
1951 class PidTests(unittest.TestCase): 2176 class PidTests(unittest.TestCase):
1952 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2177 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1953 def test_getppid(self): 2178 def test_getppid(self):
1954 p = subprocess.Popen([sys.executable, '-c', 2179 p = subprocess.Popen([sys.executable, '-c',
1955 'import os; print(os.getppid())'], 2180 'import os; print(os.getppid())'],
1956 stdout=subprocess.PIPE) 2181 stdout=subprocess.PIPE)
1957 stdout, _ = p.communicate() 2182 stdout, _ = p.communicate()
1958 # We are the parent of our subprocess 2183 # We are the parent of our subprocess
1959 self.assertEqual(int(stdout), os.getpid()) 2184 self.assertEqual(int(stdout), os.getpid())
1960 2185
2186 def test_waitpid(self):
2187 args = [sys.executable, '-c', 'pass']
2188 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2189 status = os.waitpid(pid, 0)
2190 self.assertEqual(status, (pid, 0))
2191
1961 2192
1962 # The introduction of this TestCase caused at least two different errors on 2193 # The introduction of this TestCase caused at least two different errors on
1963 # *nix buildbots. Temporarily skip this to let the buildbots move along. 2194 # *nix buildbots. Temporarily skip this to let the buildbots move along.
1964 @unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 2195 @unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
1965 @unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 2196 @unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1966 class LoginTests(unittest.TestCase): 2197 class LoginTests(unittest.TestCase):
1967 def test_getlogin(self): 2198 def test_getlogin(self):
1968 user_name = os.getlogin() 2199 user_name = os.getlogin()
1969 self.assertNotEqual(len(user_name), 0) 2200 self.assertNotEqual(len(user_name), 0)
1970 2201
1971 2202
1972 @unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 2203 @unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1973 "needs os.getpriority and os.setpriority") 2204 "needs os.getpriority and os.setpriority")
1974 class ProgramPriorityTests(unittest.TestCase): 2205 class ProgramPriorityTests(unittest.TestCase):
1975 """Tests for os.getpriority() and os.setpriority().""" 2206 """Tests for os.getpriority() and os.setpriority()."""
1976 2207
1977 def test_set_get_priority(self): 2208 def test_set_get_priority(self):
1978 2209
1979 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2210 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1980 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 2211 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1981 try: 2212 try:
1982 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2213 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1983 if base >= 19 and new_prio <= 19: 2214 if base >= 19 and new_prio <= 19:
1984 raise unittest.SkipTest( 2215 raise unittest.SkipTest("unable to reliably test setpriority "
1985 "unable to reliably test setpriority at current nice level of %s" % base) 2216 "at current nice level of %s" % base)
1986 else: 2217 else:
1987 self.assertEqual(new_prio, base + 1) 2218 self.assertEqual(new_prio, base + 1)
1988 finally: 2219 finally:
1989 try: 2220 try:
1990 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 2221 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1991 except OSError as err: 2222 except OSError as err:
1992 if err.errno != errno.EACCES: 2223 if err.errno != errno.EACCES:
1993 raise 2224 raise
1994 2225
1995 2226
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
2084 2315
2085 DATA = b"12345abcde" * 16 * 1024 # 160 KB 2316 DATA = b"12345abcde" * 16 * 1024 # 160 KB
2086 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 2317 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
2087 not sys.platform.startswith("solaris") and \ 2318 not sys.platform.startswith("solaris") and \
2088 not sys.platform.startswith("sunos") 2319 not sys.platform.startswith("sunos")
2089 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 2320 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2090 'requires headers and trailers support') 2321 'requires headers and trailers support')
2091 2322
2092 @classmethod 2323 @classmethod
2093 def setUpClass(cls): 2324 def setUpClass(cls):
2094 with open(support.TESTFN, "wb") as f: 2325 cls.key = support.threading_setup()
2095 f.write(cls.DATA) 2326 create_file(support.TESTFN, cls.DATA)
2096 2327
2097 @classmethod 2328 @classmethod
2098 def tearDownClass(cls): 2329 def tearDownClass(cls):
2330 support.threading_cleanup(*cls.key)
2099 support.unlink(support.TESTFN) 2331 support.unlink(support.TESTFN)
2100 2332
2101 def setUp(self): 2333 def setUp(self):
2102 self.server = SendfileTestServer((support.HOST, 0)) 2334 self.server = SendfileTestServer((support.HOST, 0))
2103 self.server.start() 2335 self.server.start()
2104 self.client = socket.socket() 2336 self.client = socket.socket()
2105 self.client.connect((self.server.host, self.server.port)) 2337 self.client.connect((self.server.host, self.server.port))
2106 self.client.settimeout(1) 2338 self.client.settimeout(1)
2107 # synchronize by waiting for "220 ready" response 2339 # synchronize by waiting for "220 ready" response
2108 self.client.recv(1024) 2340 self.client.recv(1024)
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
2234 self.assertEqual(total_sent, len(expected_data)) 2466 self.assertEqual(total_sent, len(expected_data))
2235 self.client.close() 2467 self.client.close()
2236 self.server.wait() 2468 self.server.wait()
2237 data = self.server.handler_instance.get_data() 2469 data = self.server.handler_instance.get_data()
2238 self.assertEqual(hash(data), hash(expected_data)) 2470 self.assertEqual(hash(data), hash(expected_data))
2239 2471
2240 @requires_headers_trailers 2472 @requires_headers_trailers
2241 def test_trailers(self): 2473 def test_trailers(self):
2242 TESTFN2 = support.TESTFN + "2" 2474 TESTFN2 = support.TESTFN + "2"
2243 file_data = b"abcdef" 2475 file_data = b"abcdef"
2244 with open(TESTFN2, 'wb') as f: 2476
2245 f.write(file_data) 2477 self.addCleanup(support.unlink, TESTFN2)
2246 with open(TESTFN2, 'rb')as f: 2478 create_file(TESTFN2, file_data)
2247 self.addCleanup(os.remove, TESTFN2) 2479
2480 with open(TESTFN2, 'rb') as f:
2248 os.sendfile(self.sockno, f.fileno(), 0, len(file_data), 2481 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2249 trailers=[b"1234"]) 2482 trailers=[b"1234"])
2250 self.client.close() 2483 self.client.close()
2251 self.server.wait() 2484 self.server.wait()
2252 data = self.server.handler_instance.get_data() 2485 data = self.server.handler_instance.get_data()
2253 self.assertEqual(data, b"abcdef1234") 2486 self.assertEqual(data, b"abcdef1234")
2254 2487
2255 @requires_headers_trailers 2488 @requires_headers_trailers
2256 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 2489 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2257 'test needs os.SF_NODISKIO') 2490 'test needs os.SF_NODISKIO')
2258 def test_flags(self): 2491 def test_flags(self):
2259 try: 2492 try:
2260 os.sendfile(self.sockno, self.fileno, 0, 4096, 2493 os.sendfile(self.sockno, self.fileno, 0, 4096,
2261 flags=os.SF_NODISKIO) 2494 flags=os.SF_NODISKIO)
2262 except OSError as err: 2495 except OSError as err:
2263 if err.errno not in (errno.EBUSY, errno.EAGAIN): 2496 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2264 raise 2497 raise
2265 2498
2266 2499
2267 def supports_extended_attributes(): 2500 def supports_extended_attributes():
2268 if not hasattr(os, "setxattr"): 2501 if not hasattr(os, "setxattr"):
2269 return False 2502 return False
2503
2270 try: 2504 try:
2271 with open(support.TESTFN, "wb") as fp: 2505 with open(support.TESTFN, "xb", 0) as fp:
2272 try: 2506 try:
2273 os.setxattr(fp.fileno(), b"user.test", b"") 2507 os.setxattr(fp.fileno(), b"user.test", b"")
2274 except OSError: 2508 except OSError:
2275 return False 2509 return False
2276 finally: 2510 finally:
2277 support.unlink(support.TESTFN) 2511 support.unlink(support.TESTFN)
2278 # Kernels < 2.6.39 don't respect setxattr flags. 2512
2279 kernel_version = platform.release() 2513 return True
2280 m = re.match("2.6.(\d{1,2})", kernel_version)
2281 return m is None or int(m.group(1)) >= 39
2282 2514
2283 2515
2284 @unittest.skipUnless(supports_extended_attributes(), 2516 @unittest.skipUnless(supports_extended_attributes(),
2285 "no non-broken extended attribute support") 2517 "no non-broken extended attribute support")
2518 # Kernels < 2.6.39 don't respect setxattr flags.
2519 @support.requires_linux_version(2, 6, 39)
2286 class ExtendedAttributeTests(unittest.TestCase): 2520 class ExtendedAttributeTests(unittest.TestCase):
2287
2288 def tearDown(self):
2289 support.unlink(support.TESTFN)
2290 2521
2291 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, * *kwargs): 2522 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, * *kwargs):
2292 fn = support.TESTFN 2523 fn = support.TESTFN
2293 open(fn, "wb").close() 2524 self.addCleanup(support.unlink, fn)
2525 create_file(fn)
2526
2294 with self.assertRaises(OSError) as cm: 2527 with self.assertRaises(OSError) as cm:
2295 getxattr(fn, s("user.test"), **kwargs) 2528 getxattr(fn, s("user.test"), **kwargs)
2296 self.assertEqual(cm.exception.errno, errno.ENODATA) 2529 self.assertEqual(cm.exception.errno, errno.ENODATA)
2530
2297 init_xattr = listxattr(fn) 2531 init_xattr = listxattr(fn)
2298 self.assertIsInstance(init_xattr, list) 2532 self.assertIsInstance(init_xattr, list)
2533
2299 setxattr(fn, s("user.test"), b"", **kwargs) 2534 setxattr(fn, s("user.test"), b"", **kwargs)
2300 xattr = set(init_xattr) 2535 xattr = set(init_xattr)
2301 xattr.add("user.test") 2536 xattr.add("user.test")
2302 self.assertEqual(set(listxattr(fn)), xattr) 2537 self.assertEqual(set(listxattr(fn)), xattr)
2303 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 2538 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2304 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 2539 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2305 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 2540 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
2541
2306 with self.assertRaises(OSError) as cm: 2542 with self.assertRaises(OSError) as cm:
2307 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 2543 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
2308 self.assertEqual(cm.exception.errno, errno.EEXIST) 2544 self.assertEqual(cm.exception.errno, errno.EEXIST)
2545
2309 with self.assertRaises(OSError) as cm: 2546 with self.assertRaises(OSError) as cm:
2310 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 2547 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
2311 self.assertEqual(cm.exception.errno, errno.ENODATA) 2548 self.assertEqual(cm.exception.errno, errno.ENODATA)
2549
2312 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 2550 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
2313 xattr.add("user.test2") 2551 xattr.add("user.test2")
2314 self.assertEqual(set(listxattr(fn)), xattr) 2552 self.assertEqual(set(listxattr(fn)), xattr)
2315 removexattr(fn, s("user.test"), **kwargs) 2553 removexattr(fn, s("user.test"), **kwargs)
2554
2316 with self.assertRaises(OSError) as cm: 2555 with self.assertRaises(OSError) as cm:
2317 getxattr(fn, s("user.test"), **kwargs) 2556 getxattr(fn, s("user.test"), **kwargs)
2318 self.assertEqual(cm.exception.errno, errno.ENODATA) 2557 self.assertEqual(cm.exception.errno, errno.ENODATA)
2558
2319 xattr.remove("user.test") 2559 xattr.remove("user.test")
2320 self.assertEqual(set(listxattr(fn)), xattr) 2560 self.assertEqual(set(listxattr(fn)), xattr)
2321 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 2561 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2322 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 2562 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2323 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 2563 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2324 removexattr(fn, s("user.test"), **kwargs) 2564 removexattr(fn, s("user.test"), **kwargs)
2325 many = sorted("user.test{}".format(i) for i in range(100)) 2565 many = sorted("user.test{}".format(i) for i in range(100))
2326 for thing in many: 2566 for thing in many:
2327 setxattr(fn, thing, b"x", **kwargs) 2567 setxattr(fn, thing, b"x", **kwargs)
2328 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 2568 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
2329 2569
2330 def _check_xattrs(self, *args, **kwargs): 2570 def _check_xattrs(self, *args, **kwargs):
2331 def make_bytes(s):
2332 return bytes(s, "ascii")
2333 self._check_xattrs_str(str, *args, **kwargs) 2571 self._check_xattrs_str(str, *args, **kwargs)
2334 support.unlink(support.TESTFN) 2572 support.unlink(support.TESTFN)
2335 self._check_xattrs_str(make_bytes, *args, **kwargs) 2573
2574 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2575 support.unlink(support.TESTFN)
2336 2576
2337 def test_simple(self): 2577 def test_simple(self):
2338 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 2578 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2339 os.listxattr) 2579 os.listxattr)
2340 2580
2341 def test_lpath(self): 2581 def test_lpath(self):
2342 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 2582 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2343 os.listxattr, follow_symlinks=False) 2583 os.listxattr, follow_symlinks=False)
2344 2584
2345 def test_fds(self): 2585 def test_fds(self):
2346 def getxattr(path, *args): 2586 def getxattr(path, *args):
2347 with open(path, "rb") as fp: 2587 with open(path, "rb") as fp:
2348 return os.getxattr(fp.fileno(), *args) 2588 return os.getxattr(fp.fileno(), *args)
2349 def setxattr(path, *args): 2589 def setxattr(path, *args):
2350 with open(path, "wb") as fp: 2590 with open(path, "wb", 0) as fp:
2351 os.setxattr(fp.fileno(), *args) 2591 os.setxattr(fp.fileno(), *args)
2352 def removexattr(path, *args): 2592 def removexattr(path, *args):
2353 with open(path, "wb") as fp: 2593 with open(path, "wb", 0) as fp:
2354 os.removexattr(fp.fileno(), *args) 2594 os.removexattr(fp.fileno(), *args)
2355 def listxattr(path, *args): 2595 def listxattr(path, *args):
2356 with open(path, "rb") as fp: 2596 with open(path, "rb") as fp:
2357 return os.listxattr(fp.fileno(), *args) 2597 return os.listxattr(fp.fileno(), *args)
2358 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 2598 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2359 2599
2360 2600
2361 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2601 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2362 class Win32DeprecatedBytesAPI(unittest.TestCase): 2602 class Win32DeprecatedBytesAPI(unittest.TestCase):
2363 def test_deprecated(self): 2603 def test_deprecated(self):
2364 import nt 2604 import nt
2365 filename = os.fsencode(support.TESTFN) 2605 filename = os.fsencode(support.TESTFN)
2366 with warnings.catch_warnings(): 2606 for func, *args in (
2367 warnings.simplefilter("error", DeprecationWarning) 2607 (nt._getfullpathname, filename),
2368 for func, *args in ( 2608 (nt._isdir, filename),
2369 (nt._getfullpathname, filename), 2609 (os.access, filename, os.R_OK),
2370 (nt._isdir, filename), 2610 (os.chdir, filename),
2371 (os.access, filename, os.R_OK), 2611 (os.chmod, filename, 0o777),
2372 (os.chdir, filename), 2612 (os.getcwdb,),
2373 (os.chmod, filename, 0o777), 2613 (os.link, filename, filename),
2374 (os.getcwdb,), 2614 (os.listdir, filename),
2375 (os.link, filename, filename), 2615 (os.lstat, filename),
2376 (os.listdir, filename), 2616 (os.mkdir, filename),
2377 (os.lstat, filename), 2617 (os.open, filename, os.O_RDONLY),
2378 (os.mkdir, filename), 2618 (os.rename, filename, filename),
2379 (os.open, filename, os.O_RDONLY), 2619 (os.rmdir, filename),
2380 (os.rename, filename, filename), 2620 (os.startfile, filename),
2381 (os.rmdir, filename), 2621 (os.stat, filename),
2382 (os.startfile, filename), 2622 (os.unlink, filename),
2383 (os.stat, filename), 2623 (os.utime, filename),
2384 (os.unlink, filename), 2624 ):
2385 (os.utime, filename), 2625 with bytes_filename_warn(True):
2386 ): 2626 try:
2387 self.assertRaises(DeprecationWarning, func, *args) 2627 func(*args)
2628 except OSError:
2629 # ignore OSError, we only care about DeprecationWarning
2630 pass
2388 2631
2389 @support.skip_unless_symlink 2632 @support.skip_unless_symlink
2390 def test_symlink(self): 2633 def test_symlink(self):
2634 self.addCleanup(support.unlink, support.TESTFN)
2635
2391 filename = os.fsencode(support.TESTFN) 2636 filename = os.fsencode(support.TESTFN)
2392 with warnings.catch_warnings(): 2637 with bytes_filename_warn(True):
2393 warnings.simplefilter("error", DeprecationWarning) 2638 os.symlink(filename, filename)
2394 self.assertRaises(DeprecationWarning,
2395 os.symlink, filename, filename)
2396 2639
2397 2640
2398 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal _size") 2641 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal _size")
2399 class TermsizeTests(unittest.TestCase): 2642 class TermsizeTests(unittest.TestCase):
2400 def test_does_not_crash(self): 2643 def test_does_not_crash(self):
2401 """Check if get_terminal_size() returns a meaningful value. 2644 """Check if get_terminal_size() returns a meaningful value.
2402 2645
2403 There's no easy portable way to actually check the size of the 2646 There's no easy portable way to actually check the size of the
2404 terminal, so let's check if it returns something sensible instead. 2647 terminal, so let's check if it returns something sensible instead.
2405 """ 2648 """
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
2523 funcs.append((self.filenames, os.lchmod, 0o777)) 2766 funcs.append((self.filenames, os.lchmod, 0o777))
2524 if hasattr(os, "readlink"): 2767 if hasattr(os, "readlink"):
2525 if sys.platform == "win32": 2768 if sys.platform == "win32":
2526 funcs.append((self.unicode_filenames, os.readlink,)) 2769 funcs.append((self.unicode_filenames, os.readlink,))
2527 else: 2770 else:
2528 funcs.append((self.filenames, os.readlink,)) 2771 funcs.append((self.filenames, os.readlink,))
2529 2772
2530 for filenames, func, *func_args in funcs: 2773 for filenames, func, *func_args in funcs:
2531 for name in filenames: 2774 for name in filenames:
2532 try: 2775 try:
2533 func(name, *func_args) 2776 with bytes_filename_warn(False):
2777 func(name, *func_args)
2534 except OSError as err: 2778 except OSError as err:
2535 self.assertIs(err.filename, name) 2779 self.assertIs(err.filename, name)
2536 else: 2780 else:
2537 self.fail("No exception thrown by {}".format(func)) 2781 self.fail("No exception thrown by {}".format(func))
2538 2782
2539 class CPUCountTests(unittest.TestCase): 2783 class CPUCountTests(unittest.TestCase):
2540 def test_cpu_count(self): 2784 def test_cpu_count(self):
2541 cpus = os.cpu_count() 2785 cpus = os.cpu_count()
2542 if cpus is not None: 2786 if cpus is not None:
2543 self.assertIsInstance(cpus, int) 2787 self.assertIsInstance(cpus, int)
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
2623 2867
2624 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 2868 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2625 def test_openpty(self): 2869 def test_openpty(self):
2626 master_fd, slave_fd = os.openpty() 2870 master_fd, slave_fd = os.openpty()
2627 self.addCleanup(os.close, master_fd) 2871 self.addCleanup(os.close, master_fd)
2628 self.addCleanup(os.close, slave_fd) 2872 self.addCleanup(os.close, slave_fd)
2629 self.assertEqual(os.get_inheritable(master_fd), False) 2873 self.assertEqual(os.get_inheritable(master_fd), False)
2630 self.assertEqual(os.get_inheritable(slave_fd), False) 2874 self.assertEqual(os.get_inheritable(slave_fd), False)
2631 2875
2632 2876
2633 @support.reap_threads 2877 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2634 def test_main(): 2878 'needs os.get_blocking() and os.set_blocking()')
2635 support.run_unittest( 2879 class BlockingTests(unittest.TestCase):
2636 FileTests, 2880 def test_blocking(self):
2637 StatAttributeTests, 2881 fd = os.open(__file__, os.O_RDONLY)
2638 UtimeTests, 2882 self.addCleanup(os.close, fd)
2639 EnvironTests, 2883 self.assertEqual(os.get_blocking(fd), True)
2640 WalkTests, 2884
2641 FwalkTests, 2885 os.set_blocking(fd, False)
2642 MakedirTests, 2886 self.assertEqual(os.get_blocking(fd), False)
2643 DevNullTests, 2887
2644 URandomTests, 2888 os.set_blocking(fd, True)
2645 URandomFDTests, 2889 self.assertEqual(os.get_blocking(fd), True)
2646 ExecTests, 2890
2647 Win32ErrorTests, 2891
2648 TestInvalidFD, 2892
2649 PosixUidGidTests, 2893 class ExportsTests(unittest.TestCase):
2650 Pep383Tests, 2894 def test_os_all(self):
2651 Win32KillTests, 2895 self.assertIn('open', os.__all__)
2652 Win32ListdirTests, 2896 self.assertIn('walk', os.__all__)
2653 Win32SymlinkTests, 2897
2654 NonLocalSymlinkTests, 2898
2655 FSEncodingTests, 2899 class TestScandir(unittest.TestCase):
2656 DeviceEncodingTests, 2900 check_no_resource_warning = support.check_no_resource_warning
2657 PidTests, 2901
2658 LoginTests, 2902 def setUp(self):
2659 LinkTests, 2903 self.path = os.path.realpath(support.TESTFN)
2660 TestSendfile, 2904 self.addCleanup(support.rmtree, self.path)
2661 ProgramPriorityTests, 2905 os.mkdir(self.path)
2662 ExtendedAttributeTests, 2906
2663 Win32DeprecatedBytesAPI, 2907 def create_file(self, name="file.txt"):
2664 TermsizeTests, 2908 filename = os.path.join(self.path, name)
2665 OSErrorTests, 2909 create_file(filename, b'python')
2666 RemoveDirsTests, 2910 return filename
2667 CPUCountTests, 2911
2668 FDInheritanceTests, 2912 def get_entries(self, names):
2669 ) 2913 entries = dict((entry.name, entry)
2914 for entry in os.scandir(self.path))
2915 self.assertEqual(sorted(entries.keys()), names)
2916 return entries
2917
2918 def assert_stat_equal(self, stat1, stat2, skip_fields):
2919 if skip_fields:
2920 for attr in dir(stat1):
2921 if not attr.startswith("st_"):
2922 continue
2923 if attr in ("st_dev", "st_ino", "st_nlink"):
2924 continue
2925 self.assertEqual(getattr(stat1, attr),
2926 getattr(stat2, attr),
2927 (stat1, stat2, attr))
2928 else:
2929 self.assertEqual(stat1, stat2)
2930
2931 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2932 self.assertEqual(entry.name, name)
2933 self.assertEqual(entry.path, os.path.join(self.path, name))
2934 self.assertEqual(entry.inode(),
2935 os.stat(entry.path, follow_symlinks=False).st_ino)
2936
2937 entry_stat = os.stat(entry.path)
2938 self.assertEqual(entry.is_dir(),
2939 stat.S_ISDIR(entry_stat.st_mode))
2940 self.assertEqual(entry.is_file(),
2941 stat.S_ISREG(entry_stat.st_mode))
2942 self.assertEqual(entry.is_symlink(),
2943 os.path.islink(entry.path))
2944
2945 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2946 self.assertEqual(entry.is_dir(follow_symlinks=False),
2947 stat.S_ISDIR(entry_lstat.st_mode))
2948 self.assertEqual(entry.is_file(follow_symlinks=False),
2949 stat.S_ISREG(entry_lstat.st_mode))
2950
2951 self.assert_stat_equal(entry.stat(),
2952 entry_stat,
2953 os.name == 'nt' and not is_symlink)
2954 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2955 entry_lstat,
2956 os.name == 'nt')
2957
2958 def test_attributes(self):
2959 link = hasattr(os, 'link')
2960 symlink = support.can_symlink()
2961
2962 dirname = os.path.join(self.path, "dir")
2963 os.mkdir(dirname)
2964 filename = self.create_file("file.txt")
2965 if link:
2966 os.link(filename, os.path.join(self.path, "link_file.txt"))
2967 if symlink:
2968 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2969 target_is_directory=True)
2970 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2971
2972 names = ['dir', 'file.txt']
2973 if link:
2974 names.append('link_file.txt')
2975 if symlink:
2976 names.extend(('symlink_dir', 'symlink_file.txt'))
2977 entries = self.get_entries(names)
2978
2979 entry = entries['dir']
2980 self.check_entry(entry, 'dir', True, False, False)
2981
2982 entry = entries['file.txt']
2983 self.check_entry(entry, 'file.txt', False, True, False)
2984
2985 if link:
2986 entry = entries['link_file.txt']
2987 self.check_entry(entry, 'link_file.txt', False, True, False)
2988
2989 if symlink:
2990 entry = entries['symlink_dir']
2991 self.check_entry(entry, 'symlink_dir', True, False, True)
2992
2993 entry = entries['symlink_file.txt']
2994 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2995
2996 def get_entry(self, name):
2997 entries = list(os.scandir(self.path))
2998 self.assertEqual(len(entries), 1)
2999
3000 entry = entries[0]
3001 self.assertEqual(entry.name, name)
3002 return entry
3003
3004 def create_file_entry(self):
3005 filename = self.create_file()
3006 return self.get_entry(os.path.basename(filename))
3007
3008 def test_current_directory(self):
3009 filename = self.create_file()
3010 old_dir = os.getcwd()
3011 try:
3012 os.chdir(self.path)
3013
3014 # call scandir() without parameter: it must list the content
3015 # of the current directory
3016 entries = dict((entry.name, entry) for entry in os.scandir())
3017 self.assertEqual(sorted(entries.keys()),
3018 [os.path.basename(filename)])
3019 finally:
3020 os.chdir(old_dir)
3021
3022 def test_repr(self):
3023 entry = self.create_file_entry()
3024 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3025
3026 def test_removed_dir(self):
3027 path = os.path.join(self.path, 'dir')
3028
3029 os.mkdir(path)
3030 entry = self.get_entry('dir')
3031 os.rmdir(path)
3032
3033 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3034 if os.name == 'nt':
3035 self.assertTrue(entry.is_dir())
3036 self.assertFalse(entry.is_file())
3037 self.assertFalse(entry.is_symlink())
3038 if os.name == 'nt':
3039 self.assertRaises(FileNotFoundError, entry.inode)
3040 # don't fail
3041 entry.stat()
3042 entry.stat(follow_symlinks=False)
3043 else:
3044 self.assertGreater(entry.inode(), 0)
3045 self.assertRaises(FileNotFoundError, entry.stat)
3046 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=Fal se)
3047
3048 def test_removed_file(self):
3049 entry = self.create_file_entry()
3050 os.unlink(entry.path)
3051
3052 self.assertFalse(entry.is_dir())
3053 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3054 if os.name == 'nt':
3055 self.assertTrue(entry.is_file())
3056 self.assertFalse(entry.is_symlink())
3057 if os.name == 'nt':
3058 self.assertRaises(FileNotFoundError, entry.inode)
3059 # don't fail
3060 entry.stat()
3061 entry.stat(follow_symlinks=False)
3062 else:
3063 self.assertGreater(entry.inode(), 0)
3064 self.assertRaises(FileNotFoundError, entry.stat)
3065 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=Fal se)
3066
3067 def test_broken_symlink(self):
3068 if not support.can_symlink():
3069 return self.skipTest('cannot create symbolic link')
3070
3071 filename = self.create_file("file.txt")
3072 os.symlink(filename,
3073 os.path.join(self.path, "symlink.txt"))
3074 entries = self.get_entries(['file.txt', 'symlink.txt'])
3075 entry = entries['symlink.txt']
3076 os.unlink(filename)
3077
3078 self.assertGreater(entry.inode(), 0)
3079 self.assertFalse(entry.is_dir())
3080 self.assertFalse(entry.is_file()) # broken symlink returns False
3081 self.assertFalse(entry.is_dir(follow_symlinks=False))
3082 self.assertFalse(entry.is_file(follow_symlinks=False))
3083 self.assertTrue(entry.is_symlink())
3084 self.assertRaises(FileNotFoundError, entry.stat)
3085 # don't fail
3086 entry.stat(follow_symlinks=False)
3087
3088 def test_bytes(self):
3089 if os.name == "nt":
3090 # On Windows, os.scandir(bytes) must raise an exception
3091 with bytes_filename_warn(True):
3092 self.assertRaises(TypeError, os.scandir, b'.')
3093 return
3094
3095 self.create_file("file.txt")
3096
3097 path_bytes = os.fsencode(self.path)
3098 entries = list(os.scandir(path_bytes))
3099 self.assertEqual(len(entries), 1, entries)
3100 entry = entries[0]
3101
3102 self.assertEqual(entry.name, b'file.txt')
3103 self.assertEqual(entry.path,
3104 os.fsencode(os.path.join(self.path, 'file.txt')))
3105
3106 def test_empty_path(self):
3107 self.assertRaises(FileNotFoundError, os.scandir, '')
3108
3109 def test_consume_iterator_twice(self):
3110 self.create_file("file.txt")
3111 iterator = os.scandir(self.path)
3112
3113 entries = list(iterator)
3114 self.assertEqual(len(entries), 1, entries)
3115
3116 # check than consuming the iterator twice doesn't raise exception
3117 entries2 = list(iterator)
3118 self.assertEqual(len(entries2), 0, entries2)
3119
3120 def test_bad_path_type(self):
3121 for obj in [1234, 1.234, {}, []]:
3122 self.assertRaises(TypeError, os.scandir, obj)
3123
3124 def test_close(self):
3125 self.create_file("file.txt")
3126 self.create_file("file2.txt")
3127 iterator = os.scandir(self.path)
3128 next(iterator)
3129 iterator.close()
3130 # multiple closes
3131 iterator.close()
3132 with self.check_no_resource_warning():
3133 del iterator
3134
3135 def test_context_manager(self):
3136 self.create_file("file.txt")
3137 self.create_file("file2.txt")
3138 with os.scandir(self.path) as iterator:
3139 next(iterator)
3140 with self.check_no_resource_warning():
3141 del iterator
3142
3143 def test_context_manager_close(self):
3144 self.create_file("file.txt")
3145 self.create_file("file2.txt")
3146 with os.scandir(self.path) as iterator:
3147 next(iterator)
3148 iterator.close()
3149
3150 def test_context_manager_exception(self):
3151 self.create_file("file.txt")
3152 self.create_file("file2.txt")
3153 with self.assertRaises(ZeroDivisionError):
3154 with os.scandir(self.path) as iterator:
3155 next(iterator)
3156 1/0
3157 with self.check_no_resource_warning():
3158 del iterator
3159
3160 def test_resource_warning(self):
3161 self.create_file("file.txt")
3162 self.create_file("file2.txt")
3163 iterator = os.scandir(self.path)
3164 next(iterator)
3165 with self.assertWarns(ResourceWarning):
3166 del iterator
3167 support.gc_collect()
3168 # exhausted iterator
3169 iterator = os.scandir(self.path)
3170 list(iterator)
3171 with self.check_no_resource_warning():
3172 del iterator
3173
3174
3175 class TestPEP519(unittest.TestCase):
3176 "os.fspath()"
3177
3178 def test_return_bytes(self):
3179 for b in b'hello', b'goodbye', b'some/path/and/file':
3180 self.assertEqual(b, os.fspath(b))
3181
3182 def test_return_string(self):
3183 for s in 'hello', 'goodbye', 'some/path/and/file':
3184 self.assertEqual(s, os.fspath(s))
3185
3186 def test_fsencode_fsdecode_return_pathlike(self):
3187 class PathLike:
3188 def __init__(self, path):
3189 self.path = path
3190 def __fspath__(self):
3191 return self.path
3192
3193 for p in "path/like/object", b"path/like/object":
3194 pathlike = PathLike(p)
3195
3196 self.assertEqual(p, os.fspath(pathlike))
3197 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3198 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3199
3200 def test_fspathlike(self):
3201 class PathLike:
3202 def __init__(self, path=''):
3203 self.path = path
3204 def __fspath__(self):
3205 return self.path
3206
3207 self.assertEqual('#feelthegil', os.fspath(PathLike('#feelthegil')))
3208 self.assertTrue(issubclass(PathLike, os.PathLike))
3209 self.assertTrue(isinstance(PathLike(), os.PathLike))
3210
3211 message = 'expected str, bytes or os.PathLike object, not'
3212 for fn in (os.fsencode, os.fsdecode):
3213 for obj in PathLike(None), None:
3214 with self.assertRaisesRegex(TypeError, message):
3215 fn(obj)
3216
3217 def test_garbage_in_exception_out(self):
3218 vapor = type('blah', (), {})
3219 for o in int, type, os, vapor():
3220 self.assertRaises(TypeError, os.fspath, o)
3221
3222 def test_argument_required(self):
3223 with self.assertRaises(TypeError):
3224 os.fspath()
3225
2670 3226
2671 if __name__ == "__main__": 3227 if __name__ == "__main__":
2672 test_main() 3228 unittest.main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+