| OLD | NEW |
| 1 "Test posix functions" | 1 "Test posix functions" |
| 2 | 2 |
| 3 from test import support | 3 from test import support |
| 4 | 4 |
| 5 # Skip these tests if there is no posix module. | 5 # Skip these tests if there is no posix module. |
| 6 posix = support.import_module('posix') | 6 posix = support.import_module('posix') |
| 7 | 7 |
| 8 import errno | 8 import errno |
| 9 import sys | 9 import sys |
| 10 import time | 10 import time |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 def tearDown(self): | 30 def tearDown(self): |
| 31 support.unlink(support.TESTFN) | 31 support.unlink(support.TESTFN) |
| 32 self._warnings_manager.__exit__(None, None, None) | 32 self._warnings_manager.__exit__(None, None, None) |
| 33 | 33 |
| 34 def testNoArgFunctions(self): | 34 def testNoArgFunctions(self): |
| 35 # test posix functions which take no arguments and have | 35 # test posix functions which take no arguments and have |
| 36 # no side-effects which we need to cleanup (e.g., fork, wait, abort) | 36 # no side-effects which we need to cleanup (e.g., fork, wait, abort) |
| 37 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", | 37 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", |
| 38 "times", "getloadavg", | 38 "times", "getloadavg", |
| 39 "getegid", "geteuid", "getgid", "getgroups", | 39 "getegid", "geteuid", "getgid", "getgroups", |
| 40 "getpid", "getpgrp", "getppid", "getuid", | 40 "getpid", "getpgrp", "getppid", "getuid", "sync", |
| 41 ] | 41 ] |
| 42 | 42 |
| 43 for name in NO_ARG_FUNCTIONS: | 43 for name in NO_ARG_FUNCTIONS: |
| 44 posix_func = getattr(posix, name, None) | 44 posix_func = getattr(posix, name, None) |
| 45 if posix_func is not None: | 45 if posix_func is not None: |
| 46 posix_func() | 46 posix_func() |
| 47 self.assertRaises(TypeError, posix_func, 1) | 47 self.assertRaises(TypeError, posix_func, 1) |
| 48 | 48 |
| 49 if hasattr(posix, 'getresuid'): | 49 if hasattr(posix, 'getresuid'): |
| 50 def test_getresuid(self): | 50 def test_getresuid(self): |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 124 def test_ftruncate(self): | 124 def test_ftruncate(self): |
| 125 if hasattr(posix, 'ftruncate'): | 125 if hasattr(posix, 'ftruncate'): |
| 126 fp = open(support.TESTFN, 'w+') | 126 fp = open(support.TESTFN, 'w+') |
| 127 try: | 127 try: |
| 128 # we need to have some data to truncate | 128 # we need to have some data to truncate |
| 129 fp.write('test') | 129 fp.write('test') |
| 130 fp.flush() | 130 fp.flush() |
| 131 posix.ftruncate(fp.fileno(), 0) | 131 posix.ftruncate(fp.fileno(), 0) |
| 132 finally: | 132 finally: |
| 133 fp.close() | 133 fp.close() |
| 134 |
| 135 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate(
)") |
| 136 def test_truncate(self): |
| 137 with open(support.TESTFN, 'w') as fp: |
| 138 fp.write('test') |
| 139 fp.flush() |
| 140 posix.truncate(support.TESTFN, 0) |
| 141 |
| 142 @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()"
) |
| 143 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 144 @unittest.skipUnless(hasattr(os, 'wait'), "test needs os.wait()") |
| 145 def test_fexecve(self): |
| 146 fp = os.open(sys.executable, os.O_RDONLY) |
| 147 try: |
| 148 pid = os.fork() |
| 149 if pid == 0: |
| 150 os.chdir(os.path.split(sys.executable)[0]) |
| 151 posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ) |
| 152 else: |
| 153 self.assertEqual(os.wait(), (pid, 0)) |
| 154 finally: |
| 155 os.close(fp) |
| 156 |
| 157 @unittest.skipUnless(hasattr(posix, 'gethostid'), "test needs posix.gethosti
d()") |
| 158 def test_gethostid(self): |
| 159 self.assertTrue(isinstance(posix.gethostid(), int)) |
| 160 |
| 161 @unittest.skipUnless(hasattr(posix, 'sethostid'), "test needs posix.sethosti
d()") |
| 162 def test_sethostid(self): |
| 163 hid = os.gethostid() |
| 164 try: |
| 165 posix.sethostid(2464787) |
| 166 except OSError as e: |
| 167 if sys.platform.startswith("freebsd"): |
| 168 self.assertEqual(e.errno, errno.EPERM) |
| 169 else: |
| 170 self.assertEqual(e.errno, errno.EACCES) |
| 171 else: |
| 172 # running test as root! |
| 173 self.assertEqual(os.gethostid(), 2464787) |
| 174 posix.sethostid(hid) |
| 175 |
| 176 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") |
| 177 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 178 def test_waitid(self): |
| 179 pid = os.fork() |
| 180 if pid == 0: |
| 181 os.chdir(os.path.split(sys.executable)[0]) |
| 182 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.envi
ron) |
| 183 else: |
| 184 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) |
| 185 self.assertEqual(pid, res.si_pid) |
| 186 |
| 187 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") |
| 188 def test_lockf(self): |
| 189 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) |
| 190 try: |
| 191 os.write(fd, b'test') |
| 192 os.lseek(fd, 0, os.SEEK_SET) |
| 193 posix.lockf(fd, posix.F_LOCK, 4) |
| 194 # section is locked |
| 195 posix.lockf(fd, posix.F_ULOCK, 4) |
| 196 finally: |
| 197 os.close(fd) |
| 198 |
| 199 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") |
| 200 def test_pread(self): |
| 201 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 202 try: |
| 203 os.write(fd, b'test') |
| 204 os.lseek(fd, 0, os.SEEK_SET) |
| 205 self.assertEqual(b'es', posix.pread(fd, 2, 1)) |
| 206 # the first pread() shoudn't disturb the file offset |
| 207 self.assertEqual(b'te', posix.read(fd, 2)) |
| 208 finally: |
| 209 os.close(fd) |
| 210 |
| 211 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") |
| 212 def test_pwrite(self): |
| 213 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 214 try: |
| 215 os.write(fd, b'test') |
| 216 os.lseek(fd, 0, os.SEEK_SET) |
| 217 posix.pwrite(fd, b'xx', 1) |
| 218 self.assertEqual(b'txxt', posix.read(fd, 4)) |
| 219 finally: |
| 220 os.close(fd) |
| 221 |
| 222 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), |
| 223 "test needs posix.posix_fallocate()") |
| 224 def test_posix_fallocate(self): |
| 225 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) |
| 226 try: |
| 227 posix.posix_fallocate(fd, 0, 10) |
| 228 except OSError as inst: |
| 229 # issue10812, ZFS doesn't appear to support posix_fallocate, |
| 230 # so skip Solaris-based since they are likely to have ZFS. |
| 231 if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"
): |
| 232 raise |
| 233 finally: |
| 234 os.close(fd) |
| 235 |
| 236 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), |
| 237 "test needs posix.posix_fadvise()") |
| 238 def test_posix_fadvise(self): |
| 239 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 240 try: |
| 241 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) |
| 242 finally: |
| 243 os.close(fd) |
| 244 |
| 245 @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()"
) |
| 246 def test_futimes(self): |
| 247 now = time.time() |
| 248 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 249 try: |
| 250 posix.futimes(fd, None) |
| 251 self.assertRaises(TypeError, posix.futimes, fd, (None, None)) |
| 252 self.assertRaises(TypeError, posix.futimes, fd, (now, None)) |
| 253 self.assertRaises(TypeError, posix.futimes, fd, (None, now)) |
| 254 posix.futimes(fd, (int(now), int(now))) |
| 255 posix.futimes(fd, (now, now)) |
| 256 finally: |
| 257 os.close(fd) |
| 258 |
| 259 @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()"
) |
| 260 def test_lutimes(self): |
| 261 now = time.time() |
| 262 posix.lutimes(support.TESTFN, None) |
| 263 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None)
) |
| 264 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None)) |
| 265 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now)) |
| 266 posix.lutimes(support.TESTFN, (int(now), int(now))) |
| 267 posix.lutimes(support.TESTFN, (now, now)) |
| 268 |
| 269 @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens(
)") |
| 270 def test_futimens(self): |
| 271 now = time.time() |
| 272 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 273 try: |
| 274 self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None
, None)) |
| 275 self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None) |
| 276 self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0)) |
| 277 posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)), |
| 278 (int(now), int((now - int(now)) * 1e9))) |
| 279 finally: |
| 280 os.close(fd) |
| 281 |
| 282 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") |
| 283 def test_writev(self): |
| 284 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 285 try: |
| 286 os.writev(fd, (b'test1', b'tt2', b't3')) |
| 287 os.lseek(fd, 0, os.SEEK_SET) |
| 288 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) |
| 289 finally: |
| 290 os.close(fd) |
| 291 |
| 292 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") |
| 293 def test_readv(self): |
| 294 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 295 try: |
| 296 os.write(fd, b'test1tt2t3') |
| 297 os.lseek(fd, 0, os.SEEK_SET) |
| 298 buf = [bytearray(i) for i in [5, 3, 2]] |
| 299 self.assertEqual(posix.readv(fd, buf), 10) |
| 300 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) |
| 301 finally: |
| 302 os.close(fd) |
| 134 | 303 |
| 135 def test_dup(self): | 304 def test_dup(self): |
| 136 if hasattr(posix, 'dup'): | 305 if hasattr(posix, 'dup'): |
| 137 fp = open(support.TESTFN) | 306 fp = open(support.TESTFN) |
| 138 try: | 307 try: |
| 139 fd = posix.dup(fp.fileno()) | 308 fd = posix.dup(fp.fileno()) |
| 140 self.assertIsInstance(fd, int) | 309 self.assertIsInstance(fd, int) |
| 141 os.close(fd) | 310 os.close(fd) |
| 142 finally: | 311 finally: |
| 143 fp.close() | 312 fp.close() |
| (...skipping 276 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 420 for groups in [[0], list(range(16))]: | 589 for groups in [[0], list(range(16))]: |
| 421 posix.setgroups(groups) | 590 posix.setgroups(groups) |
| 422 self.assertListEqual(groups, posix.getgroups()) | 591 self.assertListEqual(groups, posix.getgroups()) |
| 423 | 592 |
| 424 | 593 |
| 425 def test_main(): | 594 def test_main(): |
| 426 support.run_unittest(PosixTester, PosixGroupsTester) | 595 support.run_unittest(PosixTester, PosixGroupsTester) |
| 427 | 596 |
| 428 if __name__ == '__main__': | 597 if __name__ == '__main__': |
| 429 test_main() | 598 test_main() |
| OLD | NEW |