| OLD | NEW |
| 1 "Test posix functions" | 1 "Test posix functions" |
| 2 | 2 |
| 3 from test import support | 3 from test import support |
| 4 | 4 |
| 5 # Skip these tests if there is no posix module. | 5 # Skip these tests if there is no posix module. |
| 6 posix = support.import_module('posix') | 6 posix = support.import_module('posix') |
| 7 | 7 |
| 8 import errno | 8 import errno |
| 9 import sys | 9 import sys |
| 10 import time | 10 import time |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 def tearDown(self): | 30 def tearDown(self): |
| 31 support.unlink(support.TESTFN) | 31 support.unlink(support.TESTFN) |
| 32 self._warnings_manager.__exit__(None, None, None) | 32 self._warnings_manager.__exit__(None, None, None) |
| 33 | 33 |
| 34 def testNoArgFunctions(self): | 34 def testNoArgFunctions(self): |
| 35 # test posix functions which take no arguments and have | 35 # test posix functions which take no arguments and have |
| 36 # no side-effects which we need to cleanup (e.g., fork, wait, abort) | 36 # no side-effects which we need to cleanup (e.g., fork, wait, abort) |
| 37 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", | 37 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", |
| 38 "times", "getloadavg", | 38 "times", "getloadavg", |
| 39 "getegid", "geteuid", "getgid", "getgroups", | 39 "getegid", "geteuid", "getgid", "getgroups", |
| 40 "getpid", "getpgrp", "getppid", "getuid", | 40 "getpid", "getpgrp", "getppid", "getuid", "sync", |
| 41 ] | 41 ] |
| 42 | 42 |
| 43 for name in NO_ARG_FUNCTIONS: | 43 for name in NO_ARG_FUNCTIONS: |
| 44 posix_func = getattr(posix, name, None) | 44 posix_func = getattr(posix, name, None) |
| 45 if posix_func is not None: | 45 if posix_func is not None: |
| 46 posix_func() | 46 posix_func() |
| 47 self.assertRaises(TypeError, posix_func, 1) | 47 self.assertRaises(TypeError, posix_func, 1) |
| 48 | 48 |
| 49 if hasattr(posix, 'getresuid'): | 49 if hasattr(posix, 'getresuid'): |
| 50 def test_getresuid(self): | 50 def test_getresuid(self): |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 124 def test_ftruncate(self): | 124 def test_ftruncate(self): |
| 125 if hasattr(posix, 'ftruncate'): | 125 if hasattr(posix, 'ftruncate'): |
| 126 fp = open(support.TESTFN, 'w+') | 126 fp = open(support.TESTFN, 'w+') |
| 127 try: | 127 try: |
| 128 # we need to have some data to truncate | 128 # we need to have some data to truncate |
| 129 fp.write('test') | 129 fp.write('test') |
| 130 fp.flush() | 130 fp.flush() |
| 131 posix.ftruncate(fp.fileno(), 0) | 131 posix.ftruncate(fp.fileno(), 0) |
| 132 finally: | 132 finally: |
| 133 fp.close() | 133 fp.close() |
| 134 |
| 135 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate(
)") |
| 136 def test_truncate(self): |
| 137 fp = open(support.TESTFN, 'w+') |
| 138 try: |
| 139 # we need to have some data to truncate |
| 140 fp.write('test') |
| 141 fp.flush() |
| 142 posix.truncate(support.TESTFN, 0) |
| 143 finally: |
| 144 fp.close() |
| 145 |
| 146 @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()"
) |
| 147 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 148 @unittest.skipUnless(hasattr(os, 'chdir'), "test needs os.chdir()") |
| 149 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") |
| 150 def test_fexecve(self): |
| 151 fp = os.open(sys.executable, os.O_RDONLY) |
| 152 try: |
| 153 pid = os.fork() |
| 154 if pid == 0: |
| 155 os.chdir(os.path.split(sys.executable)[0]) |
| 156 posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ) |
| 157 else: |
| 158 os.waitpid(pid, 0) |
| 159 finally: |
| 160 os.close(fp) |
| 161 |
| 162 @unittest.skipUnless(hasattr(posix, 'gethostid'), "test needs posix.gethosti
d()") |
| 163 def test_gethostid(self): |
| 164 self.assertTrue(isinstance(posix.gethostid(), int)) |
| 165 |
| 166 @unittest.skipUnless(hasattr(posix, 'sethostname'), "test needs posix.sethos
tname()") |
| 167 def test_sethostname(self): |
| 168 import socket |
| 169 oldhn = socket.gethostname() |
| 170 try: |
| 171 posix.sethostname('new') |
| 172 except OSError as inst: |
| 173 # requires root permission |
| 174 self.assertEqual(inst.errno, 1) |
| 175 else: |
| 176 # running test as root! |
| 177 self.assertEqual(socket.gethostname(), 'new') |
| 178 posix.sethostname(oldhn) |
| 179 |
| 180 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") |
| 181 @unittest.skipUnless(hasattr(os, 'chdir'), "test needs os.chdir()") |
| 182 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") |
| 183 def test_waitid(self): |
| 184 pid = os.fork() |
| 185 if pid == 0: |
| 186 os.chdir(os.path.split(sys.executable)[0]) |
| 187 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.envi
ron) |
| 188 else: |
| 189 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) |
| 190 self.assertEqual(pid, res[3]) |
| 191 |
| 192 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") |
| 193 def test_lockf(self): |
| 194 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) |
| 195 try: |
| 196 os.write(fd, b'test') |
| 197 os.lseek(fd, 0, os.SEEK_SET) |
| 198 posix.lockf(fd, posix.F_LOCK, 4) |
| 199 # section is locked |
| 200 posix.lockf(fd, posix.F_ULOCK, 4) |
| 201 finally: |
| 202 os.close(fd) |
| 203 |
| 204 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") |
| 205 def test_pread(self): |
| 206 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 207 try: |
| 208 os.write(fd, b'test') |
| 209 os.lseek(fd, 0, os.SEEK_SET) |
| 210 self.assertEqual(b'es', posix.pread(fd, 2, 1)) |
| 211 # the first pread() shoudn't disturb the file offset |
| 212 self.assertEqual(b'te', posix.read(fd, 2)) |
| 213 finally: |
| 214 os.close(fd) |
| 215 |
| 216 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") |
| 217 def test_pwrite(self): |
| 218 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 219 try: |
| 220 os.write(fd, b'test') |
| 221 os.lseek(fd, 0, os.SEEK_SET) |
| 222 posix.pwrite(fd, b'xx', 1) |
| 223 self.assertEqual(b'txxt', posix.read(fd, 4)) |
| 224 finally: |
| 225 os.close(fd) |
| 226 |
| 227 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), |
| 228 "test needs posix.posix_fallocate()") |
| 229 def test_posix_fallocate(self): |
| 230 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) |
| 231 try: |
| 232 posix.posix_fallocate(fd, 0, 10) |
| 233 except OSError as inst: |
| 234 # issue10812, ZFS doesn't appear to support posix_fallocate, |
| 235 # so skip Solaris-based since they are likely to have ZFS. |
| 236 if inst.errno != 22 or not sys.platform.startswith("sunos"): |
| 237 raise |
| 238 finally: |
| 239 os.close(fd) |
| 240 |
| 241 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), |
| 242 "test needs posix.posix_fadvise()") |
| 243 def test_posix_fadvise(self): |
| 244 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 245 try: |
| 246 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) |
| 247 finally: |
| 248 os.close(fd) |
| 249 |
| 250 @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()"
) |
| 251 def test_futimes(self): |
| 252 now = time.time() |
| 253 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 254 try: |
| 255 posix.futimes(fd, None) |
| 256 self.assertRaises(TypeError, posix.futimes, fd, (None, None)) |
| 257 self.assertRaises(TypeError, posix.futimes, fd, (now, None)) |
| 258 self.assertRaises(TypeError, posix.futimes, fd, (None, now)) |
| 259 posix.futimes(fd, (int(now), int(now))) |
| 260 posix.futimes(fd, (now, now)) |
| 261 finally: |
| 262 os.close(fd) |
| 263 |
| 264 @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()"
) |
| 265 def test_lutimes(self): |
| 266 now = time.time() |
| 267 posix.lutimes(support.TESTFN, None) |
| 268 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None)
) |
| 269 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None)) |
| 270 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now)) |
| 271 posix.lutimes(support.TESTFN, (int(now), int(now))) |
| 272 posix.lutimes(support.TESTFN, (now, now)) |
| 273 |
| 274 @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens(
)") |
| 275 def test_futimens(self): |
| 276 now = time.time() |
| 277 fd = os.open(support.TESTFN, os.O_RDONLY) |
| 278 try: |
| 279 self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None
, None)) |
| 280 self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None) |
| 281 self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0)) |
| 282 posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)), |
| 283 (int(now), int((now - int(now)) * 1e9))) |
| 284 finally: |
| 285 os.close(fd) |
| 286 |
| 287 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") |
| 288 def test_writev(self): |
| 289 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 290 try: |
| 291 os.writev(fd, (b'test1', b'tt2', b't3')) |
| 292 os.lseek(fd, 0, os.SEEK_SET) |
| 293 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) |
| 294 finally: |
| 295 os.close(fd) |
| 296 |
| 297 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") |
| 298 def test_readv(self): |
| 299 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) |
| 300 try: |
| 301 os.write(fd, b'test1tt2t3') |
| 302 os.lseek(fd, 0, os.SEEK_SET) |
| 303 a = bytearray(5) |
| 304 b = bytearray(3) |
| 305 c = bytearray(2) |
| 306 self.assertEqual(posix.readv(fd, (a, b, c)), 10) |
| 307 self.assertEqual((b'test1', b'tt2', b't3'), |
| 308 (bytes(a), bytes(b), bytes(c))) |
| 309 finally: |
| 310 os.close(fd) |
| 134 | 311 |
| 135 def test_dup(self): | 312 def test_dup(self): |
| 136 if hasattr(posix, 'dup'): | 313 if hasattr(posix, 'dup'): |
| 137 fp = open(support.TESTFN) | 314 fp = open(support.TESTFN) |
| 138 try: | 315 try: |
| 139 fd = posix.dup(fp.fileno()) | 316 fd = posix.dup(fp.fileno()) |
| 140 self.assertIsInstance(fd, int) | 317 self.assertIsInstance(fd, int) |
| 141 os.close(fd) | 318 os.close(fd) |
| 142 finally: | 319 finally: |
| 143 fp.close() | 320 fp.close() |
| (...skipping 276 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 420 for groups in [[0], list(range(16))]: | 597 for groups in [[0], list(range(16))]: |
| 421 posix.setgroups(groups) | 598 posix.setgroups(groups) |
| 422 self.assertListEqual(groups, posix.getgroups()) | 599 self.assertListEqual(groups, posix.getgroups()) |
| 423 | 600 |
| 424 | 601 |
| 425 def test_main(): | 602 def test_main(): |
| 426 support.run_unittest(PosixTester, PosixGroupsTester) | 603 support.run_unittest(PosixTester, PosixGroupsTester) |
| 427 | 604 |
| 428 if __name__ == '__main__': | 605 if __name__ == '__main__': |
| 429 test_main() | 606 test_main() |
| OLD | NEW |