Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(9)

Delta Between Two Patch Sets: Lib/test/test_os.py

Issue 26826: Expose new copy_file_range() syscal in os module and use it to improve shutils.copy()
Left Patch Set: Created 3 years, 8 months ago
Right Patch Set: Created 3 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Doc/library/os.rst ('k') | Modules/posixmodule.c » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # As a test suite for the os module, this is woefully inadequate, but this 1 # As a test suite for the os module, this is woefully inadequate, but this
2 # does add tests for a few functions which have been determined to be more 2 # does add tests for a few functions which have been determined to be more
3 # portable than they had been thought to be. 3 # portable than they had been thought to be.
4 4
5 import asynchat 5 import asynchat
6 import asyncore 6 import asyncore
7 import codecs 7 import codecs
8 import contextlib 8 import contextlib
9 import decimal 9 import decimal
10 import errno 10 import errno
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
75 # when combining linuxthreads with a failed execv call: see 75 # when combining linuxthreads with a failed execv call: see
76 # http://bugs.python.org/issue4970. 76 # http://bugs.python.org/issue4970.
77 if hasattr(sys, 'thread_info') and sys.thread_info.version: 77 if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79 else: 79 else:
80 USING_LINUXTHREADS = False 80 USING_LINUXTHREADS = False
81 81
82 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 82 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 83 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84 84
85
85 @contextlib.contextmanager 86 @contextlib.contextmanager
86 def ignore_deprecation_warnings(msg_regex, quiet=False): 87 def ignore_deprecation_warnings(msg_regex, quiet=False):
87 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet): 88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
88 yield 89 yield
89 90
90 91
91 @contextlib.contextmanager 92 @contextlib.contextmanager
92 def bytes_filename_warn(expected): 93 def bytes_filename_warn(expected):
93 msg = 'The Windows bytes API has been deprecated' 94 msg = 'The Windows bytes API has been deprecated'
94 if os.name == 'nt': 95 if os.name == 'nt':
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 symlink = support.get_attribute(os, "symlink") 240 symlink = support.get_attribute(os, "symlink")
240 try: 241 try:
241 symlink(src='target', dst=support.TESTFN, 242 symlink(src='target', dst=support.TESTFN,
242 target_is_directory=False, dir_fd=None) 243 target_is_directory=False, dir_fd=None)
243 except (NotImplementedError, OSError): 244 except (NotImplementedError, OSError):
244 pass # No OS support or unprivileged user 245 pass # No OS support or unprivileged user
245 246
246 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()') 247 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()')
247 def test_copy_file_range_ok(self): 248 def test_copy_file_range_ok(self):
248 TESTFN2 = support.TESTFN + ".3" 249 TESTFN2 = support.TESTFN + ".3"
249 250 data = b'0123456789'
250 create_file(support.TESTFN, b'0123456789') 251
252 create_file(support.TESTFN, data)
251 self.addCleanup(support.unlink, support.TESTFN) 253 self.addCleanup(support.unlink, support.TESTFN)
252 254
253 in_file = open (support.TESTFN, 'rb') 255 in_file = open(support.TESTFN, 'rb')
254 self.addCleanup(in_file.close) 256 self.addCleanup(in_file.close)
255 in_fd = in_file.fileno() 257 in_fd = in_file.fileno()
256 258
257 out_file = open (TESTFN2, 'w+b') 259 out_file = open(TESTFN2, 'w+b')
258 self.addCleanup(support.unlink, TESTFN2) 260 self.addCleanup(support.unlink, TESTFN2)
259 self.addCleanup(out_file.close) 261 self.addCleanup(out_file.close)
260 out_fd = out_file.fileno() 262 out_fd = out_file.fileno()
261 263
262 i = os.copy_file_range (in_fd, out_fd, 5); 264 try:
Martin Panter 2016/06/09 10:53:01 Probably should skip if it raises ENOSYS too. E.g.
263 self.assertEqual (i, 5); 265 i = os.copy_file_range(in_fd, out_fd, 5)
Martin Panter 2016/06/09 10:53:01 The man page says it can return less than the requ
264 266 except OSError as e:
265 in_file = open (TESTFN2, 'rb') 267 # this is needed in case Python was compiled
266 self.assertEqual (in_file.read(), b'01234') 268 # in a system with the syscall
269 # but the system in which is run does not
270 # which is kernel dependant, for the moment
271 if e.errno != errno.ENOSYS:
272 raise
273 self.skipTest(e)
274 else:
275 self.assertIn(i, range(0, 6));
276
277 in_file = open(TESTFN2, 'rb')
278 self.assertEqual(in_file.read(), data[:i])
279
280 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()')
281 def test_copy_file_range_off(self):
282 TESTFN4 = support.TESTFN + ".4"
283 # ,-- in_skip
284 # v v-- in_skip+bytes_to_copy
285 data = b'0123456789'
286 bytes_to_copy = 6
287 in_skip = 3
288 out_seek = 5
289
290 create_file(support.TESTFN, data)
291 self.addCleanup(support.unlink, support.TESTFN)
292
293 in_file = open(support.TESTFN, 'rb')
294 self.addCleanup(in_file.close)
295 in_fd = in_file.fileno()
296
297 out_file = open (TESTFN4, 'w+b')
298 self.addCleanup(support.unlink, TESTFN4)
299 self.addCleanup(out_file.close)
300 out_fd = out_file.fileno()
301
302 try:
303 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
304 in_skip, out_seek);
305 except OSError as e:
306 # this is needed in case Python was compiled
307 # in a system with the syscall
308 # but the system in which is run does not
309 # which is kernel dependant, for the moment
310 if e.errno != errno.ENOSYS:
311 raise
312 self.skipTest(e)
313 else:
314 self.assertIn(i, range(0, bytes_to_copy+1));
315
316 in_file = open(TESTFN4, 'rb')
317 read = in_file.read()
318 # seeked bytes (5) are zero'ed
319 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
320 # see diagram at the begining of this test
321 self.assertEqual(read[out_seek:],
322 data[in_skip:in_skip+bytes_to_copy])
267 323
268 324
269 # Test attributes on return values from os.*stat* family. 325 # Test attributes on return values from os.*stat* family.
270 class StatAttributeTests(unittest.TestCase): 326 class StatAttributeTests(unittest.TestCase):
271 def setUp(self): 327 def setUp(self):
272 self.fname = support.TESTFN 328 self.fname = support.TESTFN
273 self.addCleanup(support.unlink, self.fname) 329 self.addCleanup(support.unlink, self.fname)
274 create_file(self.fname, b"ABC") 330 create_file(self.fname, b"ABC")
275 331
276 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 332 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
(...skipping 1296 matching lines...) Expand 10 before | Expand all | Expand 10 after
1573 def test_inheritable(self): 1629 def test_inheritable(self):
1574 self.check(os.get_inheritable) 1630 self.check(os.get_inheritable)
1575 self.check(os.set_inheritable, True) 1631 self.check(os.set_inheritable, True)
1576 1632
1577 @unittest.skipUnless(hasattr(os, 'get_blocking'), 1633 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1578 'needs os.get_blocking() and os.set_blocking()') 1634 'needs os.get_blocking() and os.set_blocking()')
1579 def test_blocking(self): 1635 def test_blocking(self):
1580 self.check(os.get_blocking) 1636 self.check(os.get_blocking)
1581 self.check(os.set_blocking, True) 1637 self.check(os.set_blocking, True)
1582 1638
1583 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_fil e_range()')
1584 def test_copy_file_range_in(self):
1585 self.check(os.copy_file_range, 0, 0)
1586 1639
1587 class LinkTests(unittest.TestCase): 1640 class LinkTests(unittest.TestCase):
1588 def setUp(self): 1641 def setUp(self):
1589 self.file1 = support.TESTFN 1642 self.file1 = support.TESTFN
1590 self.file2 = os.path.join(support.TESTFN + "2") 1643 self.file2 = os.path.join(support.TESTFN + "2")
1591 1644
1592 def tearDown(self): 1645 def tearDown(self):
1593 for file in (self.file1, self.file2): 1646 for file in (self.file1, self.file2):
1594 if os.path.exists(file): 1647 if os.path.exists(file):
1595 os.unlink(file) 1648 os.unlink(file)
(...skipping 1570 matching lines...) Expand 10 before | Expand all | Expand 10 after
3166 for o in int, type, os, vapor(): 3219 for o in int, type, os, vapor():
3167 self.assertRaises(TypeError, os.fspath, o) 3220 self.assertRaises(TypeError, os.fspath, o)
3168 3221
3169 def test_argument_required(self): 3222 def test_argument_required(self):
3170 with self.assertRaises(TypeError): 3223 with self.assertRaises(TypeError):
3171 os.fspath() 3224 os.fspath()
3172 3225
3173 3226
3174 if __name__ == "__main__": 3227 if __name__ == "__main__":
3175 unittest.main() 3228 unittest.main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+