| Left: | ||
| Right: |
| LEFT | RIGHT |
|---|---|
| 1 "Test posix functions" | 1 "Test posix functions" |
| 2 | 2 |
| 3 from test import support | 3 from test import support |
| 4 | 4 |
| 5 # Skip these tests if there is no posix module. | 5 # Skip these tests if there is no posix module. |
| 6 posix = support.import_module('posix') | 6 posix = support.import_module('posix') |
| 7 | 7 |
| 8 import errno | 8 import errno |
| 9 import sys | 9 import sys |
| 10 import time | 10 import time |
| 11 import os | 11 import os |
| 12 import fcntl | 12 import fcntl |
| 13 import platform | 13 import platform |
| 14 import pwd | 14 import pwd |
| 15 import shutil | 15 import shutil |
| 16 import stat | 16 import stat |
| 17 import sysconfig | |
| 18 import tempfile | 17 import tempfile |
| 19 import unittest | 18 import unittest |
| 20 import warnings | 19 import warnings |
| 21 | 20 |
| 22 _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), | 21 _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), |
| 23 support.TESTFN + '-dummy-symlink') | 22 support.TESTFN + '-dummy-symlink') |
| 24 | 23 |
| 25 class PosixTester(unittest.TestCase): | 24 class PosixTester(unittest.TestCase): |
| 26 | 25 |
| 27 def setUp(self): | 26 def setUp(self): |
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 123 | 122 |
| 124 def test_statvfs(self): | 123 def test_statvfs(self): |
| 125 if hasattr(posix, 'statvfs'): | 124 if hasattr(posix, 'statvfs'): |
| 126 self.assertTrue(posix.statvfs(os.curdir)) | 125 self.assertTrue(posix.statvfs(os.curdir)) |
| 127 | 126 |
| 128 def test_fstatvfs(self): | 127 def test_fstatvfs(self): |
| 129 if hasattr(posix, 'fstatvfs'): | 128 if hasattr(posix, 'fstatvfs'): |
| 130 fp = open(support.TESTFN) | 129 fp = open(support.TESTFN) |
| 131 try: | 130 try: |
| 132 self.assertTrue(posix.fstatvfs(fp.fileno())) | 131 self.assertTrue(posix.fstatvfs(fp.fileno())) |
| 133 self.assertTrue(posix.statvfs(fd=fp.fileno())) | 132 self.assertTrue(posix.statvfs(fp.fileno())) |
| 134 finally: | 133 finally: |
| 135 fp.close() | 134 fp.close() |
| 136 | 135 |
| 137 def test_ftruncate(self): | 136 def test_ftruncate(self): |
| 138 if hasattr(posix, 'ftruncate'): | 137 if hasattr(posix, 'ftruncate'): |
| 139 fp = open(support.TESTFN, 'w+') | 138 fp = open(support.TESTFN, 'w+') |
| 140 try: | 139 try: |
| 141 # we need to have some data to truncate | 140 # we need to have some data to truncate |
| 142 fp.write('test') | 141 fp.write('test') |
| 143 fp.flush() | 142 fp.flush() |
| 144 posix.ftruncate(fp.fileno(), 0) | 143 posix.ftruncate(fp.fileno(), 0) |
| 145 finally: | 144 finally: |
| 146 fp.close() | 145 fp.close() |
| 147 | 146 |
| 148 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate( )") | 147 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate( )") |
| 149 def test_truncate(self): | 148 def test_truncate(self): |
| 150 with open(support.TESTFN, 'w') as fp: | 149 with open(support.TESTFN, 'w') as fp: |
| 151 fp.write('test') | 150 fp.write('test') |
| 152 fp.flush() | 151 fp.flush() |
| 153 posix.truncate(support.TESTFN, 0) | 152 posix.truncate(support.TESTFN, 0) |
| 154 | 153 |
| 155 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FEXECVE'), "test needs e xecve() to support the fd parameter") | 154 @unittest.skipUnless(getattr(os, 'execve') in os.supports_fd, "test needs ex ecve() to support the fd parameter") |
|
Georg
2012/06/22 20:26:02
getattr() with no third parameter will still raise
larry
2012/06/23 00:27:56
Fixed. Good catch!
| |
| 156 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") | 155 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 157 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") | 156 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") |
| 158 def test_fexecve(self): | 157 def test_fexecve(self): |
| 159 fp = os.open(sys.executable, os.O_RDONLY) | 158 fp = os.open(sys.executable, os.O_RDONLY) |
| 160 try: | 159 try: |
| 161 pid = os.fork() | 160 pid = os.fork() |
| 162 if pid == 0: | 161 if pid == 0: |
| 163 os.chdir(os.path.split(sys.executable)[0]) | 162 os.chdir(os.path.split(sys.executable)[0]) |
| 164 posix.execve(None, [sys.executable, '-c', 'pass'], os.environ, f d=fp) | 163 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) |
| 165 else: | 164 else: |
| 166 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) | 165 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) |
| 167 finally: | 166 finally: |
| 168 os.close(fp) | 167 os.close(fp) |
| 169 | 168 |
| 170 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") | 169 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") |
| 171 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") | 170 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 172 def test_waitid(self): | 171 def test_waitid(self): |
| 173 pid = os.fork() | 172 pid = os.fork() |
| 174 if pid == 0: | 173 if pid == 0: |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 229 | 228 |
| 230 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), | 229 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), |
| 231 "test needs posix.posix_fadvise()") | 230 "test needs posix.posix_fadvise()") |
| 232 def test_posix_fadvise(self): | 231 def test_posix_fadvise(self): |
| 233 fd = os.open(support.TESTFN, os.O_RDONLY) | 232 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 234 try: | 233 try: |
| 235 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) | 234 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) |
| 236 finally: | 235 finally: |
| 237 os.close(fd) | 236 os.close(fd) |
| 238 | 237 |
| 239 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FUTIMES'), "test needs p osix futimes()") | 238 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in o s.utime") |
| 240 def test_futimes(self): | 239 def test_utime_with_fd(self): |
| 241 now = time.time() | 240 now = time.time() |
| 242 fd = os.open(support.TESTFN, os.O_RDONLY) | 241 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 243 try: | 242 try: |
| 244 posix.utime(None, None, fd=fd) | 243 posix.utime(fd) |
| 245 posix.utime(None, fd=fd) | 244 posix.utime(fd, None) |
| 246 self.assertRaises(TypeError, posix.utime, None, (None, None), fd=fd) | 245 self.assertRaises(TypeError, posix.utime, fd, (None, None)) |
| 247 self.assertRaises(TypeError, posix.utime, None, (now, None), fd=fd) | 246 self.assertRaises(TypeError, posix.utime, fd, (now, None)) |
| 248 self.assertRaises(TypeError, posix.utime, None, (None, now), fd=fd) | 247 self.assertRaises(TypeError, posix.utime, fd, (None, now)) |
| 249 posix.utime(None, (int(now), int(now)), fd=fd) | 248 posix.utime(fd, (int(now), int(now))) |
| 250 posix.utime(None, (now, now), fd=fd) | 249 posix.utime(fd, (now, now)) |
| 250 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) | |
| 251 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, N one)) | |
| 252 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now , 0)) | |
| 253 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) | |
| 254 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) | |
| 255 | |
| 251 finally: | 256 finally: |
| 252 os.close(fd) | 257 os.close(fd) |
| 253 | 258 |
| 254 @unittest.skipUnless(sysconfig.get_config_var('HAVE_LUTIMES'), "test needs p osix lutimes()") | 259 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs fo llow_symlinks support in os.utime") |
| 255 def test_lutimes(self): | 260 def test_utime_nofollow_symlinks(self): |
| 256 now = time.time() | 261 now = time.time() |
| 257 posix.utime(support.TESTFN, None, follow_symlinks=False) | 262 posix.utime(support.TESTFN, None, follow_symlinks=False) |
| 258 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) | 263 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) |
| 259 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), f ollow_symlinks=False) | 264 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), f ollow_symlinks=False) |
| 260 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), f ollow_symlinks=False) | 265 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), f ollow_symlinks=False) |
| 261 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) | 266 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) |
| 262 posix.utime(support.TESTFN, (now, now), follow_symlinks=False) | 267 posix.utime(support.TESTFN, (now, now), follow_symlinks=False) |
| 263 posix.utime(support.TESTFN, follow_symlinks=False) | 268 posix.utime(support.TESTFN, follow_symlinks=False) |
| 264 | |
| 265 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FUTIMENS'), "test needs posix futimens()") | |
| 266 def test_futimens(self): | |
| 267 now = time.time() | |
| 268 fd = os.open(support.TESTFN, os.O_RDONLY) | |
| 269 try: | |
| 270 self.assertRaises(ValueError, posix.utime, None, (now, now), ns=(now , now), fd=fd) | |
| 271 self.assertRaises(ValueError, posix.utime, None, (now, 0), ns=(None, None), fd=fd) | |
| 272 self.assertRaises(ValueError, posix.utime, None, (None, None), ns=(n ow, 0), fd=fd) | |
| 273 posix.utime(None, (int(now), int((now - int(now)) * 1e9)), fd=fd) | |
| 274 posix.utime(None, ns=(int(now), int((now - int(now)) * 1e9)), fd=fd) | |
| 275 posix.utime(None, fd=fd) | |
| 276 finally: | |
| 277 os.close(fd) | |
| 278 | 269 |
| 279 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") | 270 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") |
| 280 def test_writev(self): | 271 def test_writev(self): |
| 281 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) | 272 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 282 try: | 273 try: |
| 283 os.writev(fd, (b'test1', b'tt2', b't3')) | 274 os.writev(fd, (b'test1', b'tt2', b't3')) |
| 284 os.lseek(fd, 0, os.SEEK_SET) | 275 os.lseek(fd, 0, os.SEEK_SET) |
| 285 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) | 276 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) |
| 286 finally: | 277 finally: |
| 287 os.close(fd) | 278 os.close(fd) |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 359 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) | 350 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) |
| 360 self.assertRaises(OSError, os.open, support.TESTFN, | 351 self.assertRaises(OSError, os.open, support.TESTFN, |
| 361 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) | 352 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) |
| 362 os.close(fd) | 353 os.close(fd) |
| 363 | 354 |
| 364 def test_fstat(self): | 355 def test_fstat(self): |
| 365 if hasattr(posix, 'fstat'): | 356 if hasattr(posix, 'fstat'): |
| 366 fp = open(support.TESTFN) | 357 fp = open(support.TESTFN) |
| 367 try: | 358 try: |
| 368 self.assertTrue(posix.fstat(fp.fileno())) | 359 self.assertTrue(posix.fstat(fp.fileno())) |
| 369 self.assertTrue(posix.stat(fd=fp.fileno())) | 360 self.assertTrue(posix.stat(fp.fileno())) |
| 370 finally: | 361 finally: |
| 371 fp.close() | 362 fp.close() |
| 372 | 363 |
| 373 def test_stat(self): | 364 def test_stat(self): |
| 374 if hasattr(posix, 'stat'): | 365 if hasattr(posix, 'stat'): |
| 375 self.assertTrue(posix.stat(support.TESTFN)) | 366 self.assertTrue(posix.stat(support.TESTFN)) |
| 376 | 367 |
| 377 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") | 368 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") |
| 378 def test_mkfifo(self): | 369 def test_mkfifo(self): |
| 379 support.unlink(support.TESTFN) | 370 support.unlink(support.TESTFN) |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 458 | 449 |
| 459 def test_listdir(self): | 450 def test_listdir(self): |
| 460 if hasattr(posix, 'listdir'): | 451 if hasattr(posix, 'listdir'): |
| 461 self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) | 452 self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) |
| 462 | 453 |
| 463 def test_listdir_default(self): | 454 def test_listdir_default(self): |
| 464 # When listdir is called without argument, it's the same as listdir(os.c urdir) | 455 # When listdir is called without argument, it's the same as listdir(os.c urdir) |
| 465 if hasattr(posix, 'listdir'): | 456 if hasattr(posix, 'listdir'): |
| 466 self.assertTrue(support.TESTFN in posix.listdir()) | 457 self.assertTrue(support.TESTFN in posix.listdir()) |
| 467 | 458 |
| 468 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FDOPENDIR'), "test needs posix. istdir() to support fd") | 459 @unittest.skipUnless(os.listdir in os.supports_fd, "test needs fd support fo r os.listdir()") |
| 469 def test_flistdir(self): | 460 def test_flistdir(self): |
| 470 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 461 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 471 self.addCleanup(posix.close, f) | 462 self.addCleanup(posix.close, f) |
| 472 self.assertEqual( | 463 self.assertEqual( |
| 473 sorted(posix.listdir('.')), | 464 sorted(posix.listdir('.')), |
| 474 sorted(posix.listdir(fd=f)) | 465 sorted(posix.listdir(f)) |
| 475 ) | 466 ) |
| 476 # Check that the fd offset was reset (issue #13739) | 467 # Check that the fd offset was reset (issue #13739) |
| 477 self.assertEqual( | 468 self.assertEqual( |
| 478 sorted(posix.listdir('.')), | 469 sorted(posix.listdir('.')), |
| 479 sorted(posix.listdir(fd=f)) | 470 sorted(posix.listdir(f)) |
| 480 ) | 471 ) |
| 481 | 472 |
| 482 def test_access(self): | 473 def test_access(self): |
| 483 if hasattr(posix, 'access'): | 474 if hasattr(posix, 'access'): |
| 484 self.assertTrue(posix.access(support.TESTFN, os.R_OK)) | 475 self.assertTrue(posix.access(support.TESTFN, os.R_OK)) |
| 485 | 476 |
| 486 def test_umask(self): | 477 def test_umask(self): |
| 487 if hasattr(posix, 'umask'): | 478 if hasattr(posix, 'umask'): |
| 488 old_mask = posix.umask(0) | 479 old_mask = posix.umask(0) |
| 489 self.assertIsInstance(old_mask, int) | 480 self.assertIsInstance(old_mask, int) |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 528 def test_utime(self): | 519 def test_utime(self): |
| 529 if hasattr(posix, 'utime'): | 520 if hasattr(posix, 'utime'): |
| 530 now = time.time() | 521 now = time.time() |
| 531 posix.utime(support.TESTFN, None) | 522 posix.utime(support.TESTFN, None) |
| 532 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, Non e)) | 523 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, Non e)) |
| 533 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None )) | 524 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None )) |
| 534 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now )) | 525 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now )) |
| 535 posix.utime(support.TESTFN, (int(now), int(now))) | 526 posix.utime(support.TESTFN, (int(now), int(now))) |
| 536 posix.utime(support.TESTFN, (now, now)) | 527 posix.utime(support.TESTFN, (now, now)) |
| 537 | 528 |
| 538 def _test_chflags_regular_file(self, chflags_func, target_file, kwargs): | 529 def _test_chflags_regular_file(self, chflags_func, target_file, kwargs): |
|
Georg
2012/06/22 20:26:02
Why no **kwargs?
larry
2012/06/23 00:27:56
If you look below, I'm passing literal dicts in in
| |
| 539 st = os.stat(target_file) | 530 st = os.stat(target_file) |
| 540 self.assertTrue(hasattr(st, 'st_flags')) | 531 self.assertTrue(hasattr(st, 'st_flags')) |
| 541 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE, **kwargs) | 532 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE, **kwargs) |
| 542 try: | 533 try: |
| 543 new_st = os.stat(target_file) | 534 new_st = os.stat(target_file) |
| 544 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) | 535 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) |
| 545 try: | 536 try: |
| 546 fd = open(target_file, 'w+') | 537 fd = open(target_file, 'w+') |
| 547 except IOError as e: | 538 except IOError as e: |
| 548 self.assertEqual(e.errno, errno.EPERM) | 539 self.assertEqual(e.errno, errno.EPERM) |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 658 # 'id -G' and 'os.getgroups()' should return the same | 649 # 'id -G' and 'os.getgroups()' should return the same |
| 659 # groups, ignoring order and duplicates. | 650 # groups, ignoring order and duplicates. |
| 660 # #10822 - it is implementation defined whether posix.getgroups() | 651 # #10822 - it is implementation defined whether posix.getgroups() |
| 661 # includes the effective gid so we include it anyway, since id -G does | 652 # includes the effective gid so we include it anyway, since id -G does |
| 662 self.assertEqual( | 653 self.assertEqual( |
| 663 set([int(x) for x in groups.split()]), | 654 set([int(x) for x in groups.split()]), |
| 664 set(posix.getgroups() + [posix.getegid()])) | 655 set(posix.getgroups() + [posix.getegid()])) |
| 665 | 656 |
| 666 # tests for the posix *at functions follow | 657 # tests for the posix *at functions follow |
| 667 | 658 |
| 668 @unittest.skipUnless(sysconfig.get_config_var("HAVE_FACCESSAT"), "test needs posix faccessat()") | 659 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd sup port for os.access()") |
| 669 def test_faccessat(self): | 660 def test_access_dir_fd(self): |
| 670 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 661 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 671 try: | 662 try: |
| 672 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) | 663 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) |
| 673 finally: | 664 finally: |
| 674 posix.close(f) | 665 posix.close(f) |
| 675 | 666 |
| 676 @unittest.skipUnless(sysconfig.get_config_var("HAVE_FCHMODAT"), "test needs posix fchmodat()") | 667 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd supp ort in os.chmod()") |
| 677 def test_fchmodat(self): | 668 def test_chmod_dir_fd(self): |
| 678 os.chmod(support.TESTFN, stat.S_IRUSR) | 669 os.chmod(support.TESTFN, stat.S_IRUSR) |
| 679 | 670 |
| 680 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 671 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 681 try: | 672 try: |
| 682 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) | 673 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) |
| 683 | 674 |
| 684 s = posix.stat(support.TESTFN) | 675 s = posix.stat(support.TESTFN) |
| 685 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) | 676 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) |
| 686 finally: | 677 finally: |
| 687 posix.close(f) | 678 posix.close(f) |
| 688 | 679 |
| 689 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FCHOWNAT'), "test needs posix fchownat()") | 680 @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd supp ort in os.chown()") |
| 690 def test_fchownat(self): | 681 def test_chown_dir_fd(self): |
| 691 support.unlink(support.TESTFN) | 682 support.unlink(support.TESTFN) |
| 692 support.create_empty_file(support.TESTFN) | 683 support.create_empty_file(support.TESTFN) |
| 693 | 684 |
| 694 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 685 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 695 try: | 686 try: |
| 696 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) | 687 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) |
| 697 finally: | 688 finally: |
| 698 posix.close(f) | 689 posix.close(f) |
| 699 | 690 |
| 700 @unittest.skipUnless(sysconfig.get_config_var("HAS_FSTATAT"), "test needs po six fstatat()") | 691 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd suppo rt in os.stat()") |
| 701 def test_fstatat(self): | 692 def test_stat_dir_fd(self): |
| 702 support.unlink(support.TESTFN) | 693 support.unlink(support.TESTFN) |
| 703 with open(support.TESTFN, 'w') as outfile: | 694 with open(support.TESTFN, 'w') as outfile: |
| 704 outfile.write("testline\n") | 695 outfile.write("testline\n") |
| 705 | 696 |
| 706 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 697 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 707 try: | 698 try: |
| 708 s1 = posix.stat(support.TESTFN) | 699 s1 = posix.stat(support.TESTFN) |
| 709 s2 = posix.stat(support.TESTFN, dir_fd=f) | 700 s2 = posix.stat(support.TESTFN, dir_fd=f) |
| 710 self.assertEqual(s1, s2) | 701 self.assertEqual(s1, s2) |
| 711 finally: | 702 finally: |
| 712 posix.close(f) | 703 posix.close(f) |
| 713 | 704 |
| 714 @unittest.skipUnless(sysconfig.get_config_var('HAVE_FUTIMESAT'), "test needs posix futimesat()") | 705 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd supp ort in os.utime()") |
| 715 def test_futimesat(self): | 706 def test_utime_dir_fd(self): |
| 716 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 707 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 717 try: | 708 try: |
| 718 now = time.time() | 709 now = time.time() |
| 719 posix.utime(support.TESTFN, None, dir_fd=f) | 710 posix.utime(support.TESTFN, None, dir_fd=f) |
| 720 posix.utime(support.TESTFN, dir_fd=f) | 711 posix.utime(support.TESTFN, dir_fd=f) |
| 712 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_f d=f) | |
| 721 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, Non e), dir_fd=f) | 713 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, Non e), dir_fd=f) |
| 722 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None ), dir_fd=f) | 714 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None ), dir_fd=f) |
| 723 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now ), dir_fd=f) | 715 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now ), dir_fd=f) |
| 716 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x") , dir_fd=f) | |
| 724 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) | 717 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) |
| 725 posix.utime(support.TESTFN, (now, now), dir_fd=f) | 718 posix.utime(support.TESTFN, (now, now), dir_fd=f) |
| 726 finally: | 719 posix.utime(support.TESTFN, |
| 727 posix.close(f) | 720 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) |
| 728 | 721 posix.utime(support.TESTFN, dir_fd=f, |
| 729 @unittest.skipUnless(sysconfig.get_config_var('HAVE_LINKAT'), "test needs po six linkat()") | 722 times=(int(now), int((now - int(now)) * 1e9))) |
| 730 def test_linkat(self): | 723 |
| 724 if os.utime in os.supports_follow_symlinks: | |
| 725 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) | |
| 726 | |
| 727 finally: | |
| 728 posix.close(f) | |
| 729 | |
| 730 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd suppo rt in os.link()") | |
| 731 def test_link_dir_fd(self): | |
| 731 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 732 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 732 try: | 733 try: |
| 733 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, ds t_dir_fd=f) | 734 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, ds t_dir_fd=f) |
| 734 # should have same inodes | 735 # should have same inodes |
| 735 self.assertEqual(posix.stat(support.TESTFN)[1], | 736 self.assertEqual(posix.stat(support.TESTFN)[1], |
| 736 posix.stat(support.TESTFN + 'link')[1]) | 737 posix.stat(support.TESTFN + 'link')[1]) |
| 737 finally: | 738 finally: |
| 738 posix.close(f) | 739 posix.close(f) |
| 739 support.unlink(support.TESTFN + 'link') | 740 support.unlink(support.TESTFN + 'link') |
| 740 | 741 |
| 741 @unittest.skipUnless(sysconfig.get_config_var('HAVE_MKDIRAT'), "test needs p osix mkdirat()") | 742 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd supp ort in os.mkdir()") |
| 742 def test_mkdirat(self): | 743 def test_mkdir_dir_fd(self): |
| 743 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 744 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 744 try: | 745 try: |
| 745 posix.mkdir(support.TESTFN + 'dir', dir_fd=f) | 746 posix.mkdir(support.TESTFN + 'dir', dir_fd=f) |
| 746 posix.stat(support.TESTFN + 'dir') # should not raise exception | 747 posix.stat(support.TESTFN + 'dir') # should not raise exception |
| 747 finally: | 748 finally: |
| 748 posix.close(f) | 749 posix.close(f) |
| 749 support.rmtree(support.TESTFN + 'dir') | 750 support.rmtree(support.TESTFN + 'dir') |
| 750 | 751 |
| 751 @unittest.skipUnless(sysconfig.get_config_var('HAVE_MKNODAT') and hasattr(st at, 'S_IFIFO'), | 752 @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_I FIFO'), |
| 752 "don't have mknodat()/S_IFIFO") | 753 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") |
| 753 def test_mknodat(self): | 754 def test_mknod_dir_fd(self): |
| 754 # Test using mknodat() to create a FIFO (the only use specified | 755 # Test using mknodat() to create a FIFO (the only use specified |
| 755 # by POSIX). | 756 # by POSIX). |
| 756 support.unlink(support.TESTFN) | 757 support.unlink(support.TESTFN) |
| 757 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR | 758 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR |
| 758 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 759 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 759 try: | 760 try: |
| 760 posix.mknod(support.TESTFN, mode, 0, dir_fd=f) | 761 posix.mknod(support.TESTFN, mode, 0, dir_fd=f) |
| 761 except OSError as e: | 762 except OSError as e: |
| 762 # Some old systems don't allow unprivileged users to use | 763 # Some old systems don't allow unprivileged users to use |
| 763 # mknod(), or only support creating device nodes. | 764 # mknod(), or only support creating device nodes. |
| 764 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) | 765 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) |
| 765 else: | 766 else: |
| 766 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) | 767 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) |
| 767 finally: | 768 finally: |
| 768 posix.close(f) | 769 posix.close(f) |
| 769 | 770 |
| 770 @unittest.skipUnless(sysconfig.get_config_var("HAVE_OPENAT"), "test needs po six openat()") | 771 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd suppo rt in os.open()") |
| 771 def test_openat(self): | 772 def test_open_dir_fd(self): |
| 772 support.unlink(support.TESTFN) | 773 support.unlink(support.TESTFN) |
| 773 with open(support.TESTFN, 'w') as outfile: | 774 with open(support.TESTFN, 'w') as outfile: |
| 774 outfile.write("testline\n") | 775 outfile.write("testline\n") |
| 775 a = posix.open(posix.getcwd(), posix.O_RDONLY) | 776 a = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 776 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) | 777 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) |
| 777 try: | 778 try: |
| 778 res = posix.read(b, 9).decode(encoding="utf-8") | 779 res = posix.read(b, 9).decode(encoding="utf-8") |
| 779 self.assertEqual("testline\n", res) | 780 self.assertEqual("testline\n", res) |
| 780 finally: | 781 finally: |
| 781 posix.close(a) | 782 posix.close(a) |
| 782 posix.close(b) | 783 posix.close(b) |
| 783 | 784 |
| 784 @unittest.skipUnless(sysconfig.get_config_var('HAVE_READLINKAT'), "test need s posix readlinkat()") | 785 @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd s upport in os.readlink()") |
| 785 def test_readlinkat(self): | 786 def test_readlink_dir_fd(self): |
| 786 os.symlink(support.TESTFN, support.TESTFN + 'link') | 787 os.symlink(support.TESTFN, support.TESTFN + 'link') |
| 787 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 788 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 788 try: | 789 try: |
| 789 self.assertEqual(posix.readlink(support.TESTFN + 'link'), | 790 self.assertEqual(posix.readlink(support.TESTFN + 'link'), |
| 790 posix.readlink(support.TESTFN + 'link', dir_fd=f)) | 791 posix.readlink(support.TESTFN + 'link', dir_fd=f)) |
| 791 finally: | 792 finally: |
| 792 support.unlink(support.TESTFN + 'link') | 793 support.unlink(support.TESTFN + 'link') |
| 793 posix.close(f) | 794 posix.close(f) |
| 794 | 795 |
| 795 @unittest.skipUnless(sysconfig.get_config_var("HAVE_RENAMEAT"), "test needs posix renameat()") | 796 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd sup port in os.rename()") |
| 796 def test_renameat(self): | 797 def test_rename_dir_fd(self): |
| 797 support.unlink(support.TESTFN) | 798 support.unlink(support.TESTFN) |
| 798 support.create_empty_file(support.TESTFN + 'ren') | 799 support.create_empty_file(support.TESTFN + 'ren') |
| 799 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 800 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 800 try: | 801 try: |
| 801 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, d st_dir_fd=f) | 802 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, d st_dir_fd=f) |
| 802 except: | 803 except: |
| 803 posix.rename(support.TESTFN + 'ren', support.TESTFN) | 804 posix.rename(support.TESTFN + 'ren', support.TESTFN) |
| 804 raise | 805 raise |
| 805 else: | 806 else: |
| 806 posix.stat(support.TESTFN) # should not throw exception | 807 posix.stat(support.TESTFN) # should not throw exception |
| 807 finally: | 808 finally: |
| 808 posix.close(f) | 809 posix.close(f) |
| 809 | 810 |
| 810 @unittest.skipUnless(sysconfig.get_config_var("HAVE_SYMLINKAT"), "test needs posix symlinkat()") | 811 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd su pport in os.symlink()") |
| 811 def test_symlinkat(self): | 812 def test_symlink_dir_fd(self): |
| 812 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 813 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 813 try: | 814 try: |
| 814 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) | 815 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) |
| 815 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TE STFN) | 816 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TE STFN) |
| 816 finally: | 817 finally: |
| 817 posix.close(f) | 818 posix.close(f) |
| 818 support.unlink(support.TESTFN + 'link') | 819 support.unlink(support.TESTFN + 'link') |
| 819 | 820 |
| 820 @unittest.skipUnless(sysconfig.get_config_var('HAVE_UNLINKAT'), "test needs posix unlinkat()") | 821 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd sup port in os.unlink()") |
| 821 def test_unlinkat(self): | 822 def test_unlink_dir_fd(self): |
| 822 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 823 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 823 support.create_empty_file(support.TESTFN + 'del') | 824 support.create_empty_file(support.TESTFN + 'del') |
| 824 posix.stat(support.TESTFN + 'del') # should not throw exception | 825 posix.stat(support.TESTFN + 'del') # should not throw exception |
| 825 try: | 826 try: |
| 826 posix.unlink(support.TESTFN + 'del', dir_fd=f) | 827 posix.unlink(support.TESTFN + 'del', dir_fd=f) |
| 827 except: | 828 except: |
| 828 support.unlink(support.TESTFN + 'del') | 829 support.unlink(support.TESTFN + 'del') |
| 829 raise | 830 raise |
| 830 else: | 831 else: |
| 831 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') | 832 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') |
| 832 finally: | 833 finally: |
| 833 posix.close(f) | 834 posix.close(f) |
| 834 | 835 |
| 835 @unittest.skipUnless(sysconfig.get_config_var('HAVE_UTIMENSAT'), "test needs posix utimensat()") | 836 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd sup port in os.mkfifo()") |
| 836 def test_utimensat(self): | 837 def test_mkfifo_dir_fd(self): |
| 837 f = posix.open(posix.getcwd(), posix.O_RDONLY) | |
| 838 try: | |
| 839 now = time.time() | |
| 840 posix.utime(support.TESTFN, (now, now), dir_fd=f) | |
| 841 posix.utime(support.TESTFN, dir_fd=f) | |
| 842 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) | |
| 843 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, Non e), dir_fd=f) | |
| 844 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x") , dir_fd=f) | |
| 845 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_f d=f) | |
| 846 posix.utime(support.TESTFN, | |
| 847 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) | |
| 848 posix.utime(support.TESTFN, dir_fd=f, | |
| 849 times=(int(now), int((now - int(now)) * 1e9))) | |
| 850 finally: | |
| 851 posix.close(f) | |
| 852 | |
| 853 @unittest.skipUnless(sysconfig.get_config_var("HAVE_MKFIFOAT"), "don't have posix mkfifoat()") | |
| 854 def test_mkfifoat(self): | |
| 855 support.unlink(support.TESTFN) | 838 support.unlink(support.TESTFN) |
| 856 f = posix.open(posix.getcwd(), posix.O_RDONLY) | 839 f = posix.open(posix.getcwd(), posix.O_RDONLY) |
| 857 try: | 840 try: |
| 858 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) | 841 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) |
| 859 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) | 842 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) |
| 860 finally: | 843 finally: |
| 861 posix.close(f) | 844 posix.close(f) |
| 862 | 845 |
| 863 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), | 846 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), |
| 864 "don't have scheduling support") | 847 "don't have scheduling support") |
| (...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1058 self.assertListEqual(groups, posix.getgroups()) | 1041 self.assertListEqual(groups, posix.getgroups()) |
| 1059 | 1042 |
| 1060 def test_main(): | 1043 def test_main(): |
| 1061 try: | 1044 try: |
| 1062 support.run_unittest(PosixTester, PosixGroupsTester) | 1045 support.run_unittest(PosixTester, PosixGroupsTester) |
| 1063 finally: | 1046 finally: |
| 1064 support.reap_children() | 1047 support.reap_children() |
| 1065 | 1048 |
| 1066 if __name__ == '__main__': | 1049 if __name__ == '__main__': |
| 1067 test_main() | 1050 test_main() |
| LEFT | RIGHT |