Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(95781)

Delta Between Two Patch Sets: Lib/test/test_tempfile.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 5 years, 9 months ago
Right Patch Set: Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_telnetlib.py ('k') | Lib/test/test_time.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # tempfile.py unit tests. 1 # tempfile.py unit tests.
2 import tempfile 2 import tempfile
3 import errno 3 import errno
4 import io 4 import io
5 import os 5 import os
6 import signal 6 import signal
7 import sys 7 import sys
8 import re 8 import re
9 import warnings 9 import warnings
10 import contextlib 10 import contextlib
11 import weakref
11 12
12 import unittest 13 import unittest
13 from test import support 14 from test import support, script_helper
14 15
15 16
16 if hasattr(os, 'stat'): 17 if hasattr(os, 'stat'):
17 import stat 18 import stat
18 has_stat = 1 19 has_stat = 1
19 else: 20 else:
20 has_stat = 0 21 has_stat = 0
21 22
22 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 23 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
23 has_spawnl = hasattr(os, 'spawnl') 24 has_spawnl = hasattr(os, 'spawnl')
(...skipping 293 matching lines...) Expand 10 before | Expand all | Expand 10 after
317 extant[i] = self.do_create(pre="aa") 318 extant[i] = self.do_create(pre="aa")
318 319
319 def test_choose_directory(self): 320 def test_choose_directory(self):
320 # _mkstemp_inner can create files in a user-selected directory 321 # _mkstemp_inner can create files in a user-selected directory
321 dir = tempfile.mkdtemp() 322 dir = tempfile.mkdtemp()
322 try: 323 try:
323 self.do_create(dir=dir).write(b"blat") 324 self.do_create(dir=dir).write(b"blat")
324 finally: 325 finally:
325 os.rmdir(dir) 326 os.rmdir(dir)
326 327
328 @unittest.skipUnless(has_stat, 'os.stat not available')
327 def test_file_mode(self): 329 def test_file_mode(self):
328 # _mkstemp_inner creates files with the proper mode 330 # _mkstemp_inner creates files with the proper mode
329 if not has_stat:
330 return # ugh, can't use SkipTest.
331 331
332 file = self.do_create() 332 file = self.do_create()
333 mode = stat.S_IMODE(os.stat(file.name).st_mode) 333 mode = stat.S_IMODE(os.stat(file.name).st_mode)
334 expected = 0o600 334 expected = 0o600
335 if sys.platform == 'win32': 335 if sys.platform == 'win32':
336 # There's no distinction among 'user', 'group' and 'world'; 336 # There's no distinction among 'user', 'group' and 'world';
337 # replicate the 'user' bits. 337 # replicate the 'user' bits.
338 user = expected >> 6 338 user = expected >> 6
339 expected = user * (1 + 8 + 64) 339 expected = user * (1 + 8 + 64)
340 self.assertEqual(mode, expected) 340 self.assertEqual(mode, expected)
341 341
342 @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
342 def test_noinherit(self): 343 def test_noinherit(self):
343 # _mkstemp_inner file handles are not inherited by child processes 344 # _mkstemp_inner file handles are not inherited by child processes
344 if not has_spawnl:
345 return # ugh, can't use SkipTest.
346 345
347 if support.verbose: 346 if support.verbose:
348 v="v" 347 v="v"
349 else: 348 else:
350 v="q" 349 v="q"
351 350
352 file = self.do_create() 351 file = self.do_create()
353 self.assertEqual(os.get_inheritable(file.fd), False) 352 self.assertEqual(os.get_inheritable(file.fd), False)
354 fd = "%d" % file.fd 353 fd = "%d" % file.fd
355 354
(...skipping 15 matching lines...) Expand all
371 decorated = '"%s"' % sys.executable 370 decorated = '"%s"' % sys.executable
372 tester = '"%s"' % tester 371 tester = '"%s"' % tester
373 else: 372 else:
374 decorated = sys.executable 373 decorated = sys.executable
375 374
376 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 375 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
377 self.assertFalse(retval < 0, 376 self.assertFalse(retval < 0,
378 "child process caught fatal signal %d" % -retval) 377 "child process caught fatal signal %d" % -retval)
379 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 378 self.assertFalse(retval > 0, "child process reports failure %d"%retval)
380 379
380 @unittest.skipUnless(has_textmode, "text mode not available")
381 def test_textmode(self): 381 def test_textmode(self):
382 # _mkstemp_inner can create files in text mode 382 # _mkstemp_inner can create files in text mode
383 if not has_textmode:
384 return # ugh, can't use SkipTest.
385 383
386 # A text file is truncated at the first Ctrl+Z byte 384 # A text file is truncated at the first Ctrl+Z byte
387 f = self.do_create(bin=0) 385 f = self.do_create(bin=0)
388 f.write(b"blat\x1a") 386 f.write(b"blat\x1a")
389 f.write(b"extra\n") 387 f.write(b"extra\n")
390 os.lseek(f.fd, 0, os.SEEK_SET) 388 os.lseek(f.fd, 0, os.SEEK_SET)
391 self.assertEqual(os.read(f.fd, 20), b"blat") 389 self.assertEqual(os.read(f.fd, 20), b"blat")
392 390
393 def default_mkstemp_inner(self): 391 def default_mkstemp_inner(self):
394 return tempfile._mkstemp_inner(tempfile.gettempdir(), 392 return tempfile._mkstemp_inner(tempfile.gettempdir(),
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
565 os.rmdir(i) 563 os.rmdir(i)
566 564
567 def test_choose_directory(self): 565 def test_choose_directory(self):
568 # mkdtemp can create directories in a user-selected directory 566 # mkdtemp can create directories in a user-selected directory
569 dir = tempfile.mkdtemp() 567 dir = tempfile.mkdtemp()
570 try: 568 try:
571 os.rmdir(self.do_create(dir=dir)) 569 os.rmdir(self.do_create(dir=dir))
572 finally: 570 finally:
573 os.rmdir(dir) 571 os.rmdir(dir)
574 572
573 @unittest.skipUnless(has_stat, 'os.stat not available')
575 def test_mode(self): 574 def test_mode(self):
576 # mkdtemp creates directories with the proper mode 575 # mkdtemp creates directories with the proper mode
577 if not has_stat:
578 return # ugh, can't use SkipTest.
579 576
580 dir = self.do_create() 577 dir = self.do_create()
581 try: 578 try:
582 mode = stat.S_IMODE(os.stat(dir).st_mode) 579 mode = stat.S_IMODE(os.stat(dir).st_mode)
583 mode &= 0o777 # Mask off sticky bits inherited from /tmp 580 mode &= 0o777 # Mask off sticky bits inherited from /tmp
584 expected = 0o700 581 expected = 0o700
585 if sys.platform == 'win32': 582 if sys.platform == 'win32':
586 # There's no distinction among 'user', 'group' and 'world'; 583 # There's no distinction among 'user', 'group' and 'world';
587 # replicate the 'user' bits. 584 # replicate the 'user' bits.
588 user = expected >> 6 585 user = expected >> 6
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
686 return file 683 return file
687 684
688 685
689 def test_basic(self): 686 def test_basic(self):
690 # NamedTemporaryFile can create files 687 # NamedTemporaryFile can create files
691 self.do_create() 688 self.do_create()
692 self.do_create(pre="a") 689 self.do_create(pre="a")
693 self.do_create(suf="b") 690 self.do_create(suf="b")
694 self.do_create(pre="a", suf="b") 691 self.do_create(pre="a", suf="b")
695 self.do_create(pre="aa", suf=".txt") 692 self.do_create(pre="aa", suf=".txt")
693
694 def test_method_lookup(self):
695 # Issue #18879: Looking up a temporary file method should keep it
696 # alive long enough.
697 f = self.do_create()
698 wr = weakref.ref(f)
699 write = f.write
700 write2 = f.write
701 del f
702 write(b'foo')
703 del write
704 write2(b'bar')
705 del write2
706 if support.check_impl_detail(cpython=True):
707 # No reference cycle was created.
708 self.assertIsNone(wr())
696 709
697 def test_creates_named(self): 710 def test_creates_named(self):
698 # NamedTemporaryFile creates files with names 711 # NamedTemporaryFile creates files with names
699 f = tempfile.NamedTemporaryFile() 712 f = tempfile.NamedTemporaryFile()
700 self.assertTrue(os.path.exists(f.name), 713 self.assertTrue(os.path.exists(f.name),
701 "NamedTemporaryFile %s does not exist" % f.name) 714 "NamedTemporaryFile %s does not exist" % f.name)
702 715
703 def test_del_on_close(self): 716 def test_del_on_close(self):
704 # A NamedTemporaryFile is deleted when closed 717 # A NamedTemporaryFile is deleted when closed
705 dir = tempfile.mkdtemp() 718 dir = tempfile.mkdtemp()
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
1069 class TestTemporaryDirectory(BaseTestCase): 1082 class TestTemporaryDirectory(BaseTestCase):
1070 """Test TemporaryDirectory().""" 1083 """Test TemporaryDirectory()."""
1071 1084
1072 def do_create(self, dir=None, pre="", suf="", recurse=1): 1085 def do_create(self, dir=None, pre="", suf="", recurse=1):
1073 if dir is None: 1086 if dir is None:
1074 dir = tempfile.gettempdir() 1087 dir = tempfile.gettempdir()
1075 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1088 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1076 self.nameCheck(tmp.name, dir, pre, suf) 1089 self.nameCheck(tmp.name, dir, pre, suf)
1077 # Create a subdirectory and some files 1090 # Create a subdirectory and some files
1078 if recurse: 1091 if recurse:
1079 self.do_create(tmp.name, pre, suf, recurse-1) 1092 d1 = self.do_create(tmp.name, pre, suf, recurse-1)
1093 d1.name = None
1080 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1094 with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
1081 f.write(b"Hello world!") 1095 f.write(b"Hello world!")
1082 return tmp 1096 return tmp
1083 1097
1084 def test_mkdtemp_failure(self): 1098 def test_mkdtemp_failure(self):
1085 # Check no additional exception if mkdtemp fails 1099 # Check no additional exception if mkdtemp fails
1086 # Previously would raise AttributeError instead 1100 # Previously would raise AttributeError instead
1087 # (noted as part of Issue #10188) 1101 # (noted as part of Issue #10188)
1088 with tempfile.TemporaryDirectory() as nonexistent: 1102 with tempfile.TemporaryDirectory() as nonexistent:
1089 pass 1103 pass
(...skipping 11 matching lines...) Expand all
1101 d.cleanup() 1115 d.cleanup()
1102 self.assertFalse(os.path.exists(d.name), 1116 self.assertFalse(os.path.exists(d.name),
1103 "TemporaryDirectory %s exists after cleanup" % d.name) 1117 "TemporaryDirectory %s exists after cleanup" % d.name)
1104 finally: 1118 finally:
1105 os.rmdir(dir) 1119 os.rmdir(dir)
1106 1120
1107 @support.skip_unless_symlink 1121 @support.skip_unless_symlink
1108 def test_cleanup_with_symlink_to_a_directory(self): 1122 def test_cleanup_with_symlink_to_a_directory(self):
1109 # cleanup() should not follow symlinks to directories (issue #12464) 1123 # cleanup() should not follow symlinks to directories (issue #12464)
1110 d1 = self.do_create() 1124 d1 = self.do_create()
1111 d2 = self.do_create() 1125 d2 = self.do_create(recurse=0)
1112 1126
1113 # Symlink d1/foo -> d2 1127 # Symlink d1/foo -> d2
1114 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1128 os.symlink(d2.name, os.path.join(d1.name, "foo"))
1115 1129
1116 # This call to cleanup() should not follow the "foo" symlink 1130 # This call to cleanup() should not follow the "foo" symlink
1117 d1.cleanup() 1131 d1.cleanup()
1118 1132
1119 self.assertFalse(os.path.exists(d1.name), 1133 self.assertFalse(os.path.exists(d1.name),
1120 "TemporaryDirectory %s exists after cleanup" % d1.name) 1134 "TemporaryDirectory %s exists after cleanup" % d1.name)
1121 self.assertTrue(os.path.exists(d2.name), 1135 self.assertTrue(os.path.exists(d2.name),
1122 "Directory pointed to by a symlink was deleted") 1136 "Directory pointed to by a symlink was deleted")
1123 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1137 self.assertEqual(os.listdir(d2.name), ['test.txt'],
1124 "Contents of the directory pointed to by a symlink " 1138 "Contents of the directory pointed to by a symlink "
1125 "were deleted") 1139 "were deleted")
1126 d2.cleanup() 1140 d2.cleanup()
1127 1141
1128 @support.cpython_only 1142 @support.cpython_only
1129 def test_del_on_collection(self): 1143 def test_del_on_collection(self):
1130 # A TemporaryDirectory is deleted when garbage collected 1144 # A TemporaryDirectory is deleted when garbage collected
1131 dir = tempfile.mkdtemp() 1145 dir = tempfile.mkdtemp()
1132 try: 1146 try:
1133 d = self.do_create(dir=dir) 1147 d = self.do_create(dir=dir)
1134 name = d.name 1148 name = d.name
1135 del d # Rely on refcounting to invoke __del__ 1149 del d # Rely on refcounting to invoke __del__
1136 self.assertFalse(os.path.exists(name), 1150 self.assertFalse(os.path.exists(name),
1137 "TemporaryDirectory %s exists after __del__" % name) 1151 "TemporaryDirectory %s exists after __del__" % name)
1138 finally: 1152 finally:
1139 os.rmdir(dir) 1153 os.rmdir(dir)
1140 1154
1141 @unittest.expectedFailure # See issue #10188
1142 def test_del_on_shutdown(self): 1155 def test_del_on_shutdown(self):
1143 # A TemporaryDirectory may be cleaned up during shutdown 1156 # A TemporaryDirectory may be cleaned up during shutdown
1144 # Make sure it works with the relevant modules nulled out
1145 with self.do_create() as dir: 1157 with self.do_create() as dir:
1146 d = self.do_create(dir=dir) 1158 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings '):
1147 # Mimic the nulling out of modules that 1159 code = """if True:
1148 # occurs during system shutdown 1160 import builtins
1149 modules = [os, os.path] 1161 import os
1150 if has_stat: 1162 import shutil
1151 modules.append(stat) 1163 import sys
1152 # Currently broken, so suppress the warning 1164 import tempfile
1153 # that is otherwise emitted on stdout 1165 import warnings
1154 with support.captured_stderr() as err: 1166
1155 with NulledModules(*modules): 1167 tmp = tempfile.TemporaryDirectory(dir={dir!r})
1156 d.cleanup() 1168 sys.stdout.buffer.write(tmp.name.encode())
1157 # Currently broken, so stop spurious exception by 1169
1158 # indicating the object has already been closed 1170 tmp2 = os.path.join(tmp.name, 'test_dir')
1159 d._closed = True 1171 os.mkdir(tmp2)
1160 # And this assert will fail, as expected by the 1172 with open(os.path.join(tmp2, "test.txt"), "w") as f:
1161 # unittest decorator... 1173 f.write("Hello world!")
1162 self.assertFalse(os.path.exists(d.name), 1174
1163 "TemporaryDirectory %s exists after cleanup" % d.name) 1175 {mod}.tmp = tmp
1176
1177 warnings.filterwarnings("always", category=ResourceWarning)
1178 """.format(dir=dir, mod=mod)
1179 rc, out, err = script_helper.assert_python_ok("-c", code)
1180 tmp_name = out.decode().strip()
1181 self.assertFalse(os.path.exists(tmp_name),
1182 "TemporaryDirectory %s exists after cleanup" % tmp_n ame)
1183 err = err.decode('utf-8', 'backslashreplace')
1184 self.assertNotIn("Exception ", err)
1185 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1164 1186
1165 def test_warnings_on_cleanup(self): 1187 def test_warnings_on_cleanup(self):
1166 # Two kinds of warning on shutdown 1188 # ResourceWarning will be triggered by __del__
1167 # Issue 10888: may write to stderr if modules are nulled out
1168 # ResourceWarning will be triggered by __del__
1169 with self.do_create() as dir: 1189 with self.do_create() as dir:
1170 if os.sep != '\\': 1190 d = self.do_create(dir=dir, recurse=3)
1171 # Embed a backslash in order to make sure string escaping 1191 name = d.name
1172 # in the displayed error message is dealt with correctly
1173 suffix = '\\check_backslash_handling'
1174 else:
1175 suffix = ''
1176 d = self.do_create(dir=dir, suf=suffix)
1177
1178 #Check for the Issue 10888 message
1179 modules = [os, os.path]
1180 if has_stat:
1181 modules.append(stat)
1182 with support.captured_stderr() as err:
1183 with NulledModules(*modules):
1184 d.cleanup()
1185 message = err.getvalue().replace('\\\\', '\\')
1186 self.assertIn("while cleaning up", message)
1187 self.assertIn(d.name, message)
1188 1192
1189 # Check for the resource warning 1193 # Check for the resource warning
1190 with support.check_warnings(('Implicitly', ResourceWarning), quiet=F alse): 1194 with support.check_warnings(('Implicitly', ResourceWarning), quiet=F alse):
1191 warnings.filterwarnings("always", category=ResourceWarning) 1195 warnings.filterwarnings("always", category=ResourceWarning)
1192 d.__del__() 1196 del d
1193 self.assertFalse(os.path.exists(d.name), 1197 support.gc_collect()
1194 "TemporaryDirectory %s exists after __del__" % d.name) 1198 self.assertFalse(os.path.exists(name),
1199 "TemporaryDirectory %s exists after __del__" % name)
1195 1200
1196 def test_multiple_close(self): 1201 def test_multiple_close(self):
1197 # Can be cleaned-up many times without error 1202 # Can be cleaned-up many times without error
1198 d = self.do_create() 1203 d = self.do_create()
1199 d.cleanup() 1204 d.cleanup()
1200 d.cleanup() 1205 d.cleanup()
1201 d.cleanup() 1206 d.cleanup()
1202 1207
1203 def test_context_manager(self): 1208 def test_context_manager(self):
1204 # Can be used as a context manager 1209 # Can be used as a context manager
1205 d = self.do_create() 1210 d = self.do_create()
1206 with d as name: 1211 with d as name:
1207 self.assertTrue(os.path.exists(name)) 1212 self.assertTrue(os.path.exists(name))
1208 self.assertEqual(name, d.name) 1213 self.assertEqual(name, d.name)
1209 self.assertFalse(os.path.exists(name)) 1214 self.assertFalse(os.path.exists(name))
1210 1215
1211 1216
1212 def test_main(): 1217 def test_main():
1213 support.run_unittest(__name__) 1218 support.run_unittest(__name__)
1214 1219
1215 if __name__ == "__main__": 1220 if __name__ == "__main__":
1216 test_main() 1221 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+