Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(167809)

Delta Between Two Patch Sets: Lib/test/test_tempfile.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 7 years ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_telnetlib.py ('k') | Lib/test/test_time.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # tempfile.py unit tests. 1 # tempfile.py unit tests.
2 import tempfile 2 import tempfile
3 import errno
4 import io
3 import os 5 import os
4 import signal 6 import signal
5 import sys 7 import sys
6 import re 8 import re
7 import warnings 9 import warnings
10 import contextlib
11 import weakref
8 12
9 import unittest 13 import unittest
10 from test import support 14 from test import support, script_helper
11 15
12 16
13 if hasattr(os, 'stat'): 17 if hasattr(os, 'stat'):
14 import stat 18 import stat
15 has_stat = 1 19 has_stat = 1
16 else: 20 else:
17 has_stat = 0 21 has_stat = 0
18 22
19 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 23 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
20 has_spawnl = hasattr(os, 'spawnl') 24 has_spawnl = hasattr(os, 'spawnl')
21 25
22 # TEST_FILES may need to be tweaked for systems depending on the maximum 26 # TEST_FILES may need to be tweaked for systems depending on the maximum
23 # number of files that can be opened at one time (see ulimit -n) 27 # number of files that can be opened at one time (see ulimit -n)
24 if sys.platform.startswith('openbsd'): 28 if sys.platform.startswith('openbsd'):
25 TEST_FILES = 48 29 TEST_FILES = 48
26 else: 30 else:
27 TEST_FILES = 100 31 TEST_FILES = 100
28 32
29 # This is organized as one test for each chunk of code in tempfile.py, 33 # This is organized as one test for each chunk of code in tempfile.py,
30 # in order of their appearance in the file. Testing which requires 34 # in order of their appearance in the file. Testing which requires
31 # threads is not done here. 35 # threads is not done here.
32 36
33 # Common functionality. 37 # Common functionality.
34 class BaseTestCase(unittest.TestCase): 38 class BaseTestCase(unittest.TestCase):
35 39
36 str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") 40 str_check = re.compile(r"^[a-z0-9_-]{8}$")
37 41
38 def setUp(self): 42 def setUp(self):
39 self._warnings_manager = support.check_warnings() 43 self._warnings_manager = support.check_warnings()
40 self._warnings_manager.__enter__() 44 self._warnings_manager.__enter__()
41 warnings.filterwarnings("ignore", category=RuntimeWarning, 45 warnings.filterwarnings("ignore", category=RuntimeWarning,
42 message="mktemp", module=__name__) 46 message="mktemp", module=__name__)
43 47
44 def tearDown(self): 48 def tearDown(self):
45 self._warnings_manager.__exit__(None, None, None) 49 self._warnings_manager.__exit__(None, None, None)
46 50
47 51
48 def nameCheck(self, name, dir, pre, suf): 52 def nameCheck(self, name, dir, pre, suf):
49 (ndir, nbase) = os.path.split(name) 53 (ndir, nbase) = os.path.split(name)
50 npre = nbase[:len(pre)] 54 npre = nbase[:len(pre)]
51 nsuf = nbase[len(nbase)-len(suf):] 55 nsuf = nbase[len(nbase)-len(suf):]
52 56
53 # check for equality of the absolute paths! 57 # check for equality of the absolute paths!
54 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 58 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
55 "file '%s' not in directory '%s'" % (name, dir)) 59 "file '%s' not in directory '%s'" % (name, dir))
56 self.assertEqual(npre, pre, 60 self.assertEqual(npre, pre,
57 "file '%s' does not begin with '%s'" % (nbase, pre)) 61 "file '%s' does not begin with '%s'" % (nbase, pre))
58 self.assertEqual(nsuf, suf, 62 self.assertEqual(nsuf, suf,
59 "file '%s' does not end with '%s'" % (nbase, suf)) 63 "file '%s' does not end with '%s'" % (nbase, suf))
60 64
61 nbase = nbase[len(pre):len(nbase)-len(suf)] 65 nbase = nbase[len(pre):len(nbase)-len(suf)]
62 self.assertTrue(self.str_check.match(nbase), 66 self.assertTrue(self.str_check.match(nbase),
63 "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/" 67 "random string '%s' does not match ^[a-z0-9_-]{8}$"
64 % nbase) 68 % nbase)
65 69
66 70
67 class TestExports(BaseTestCase): 71 class TestExports(BaseTestCase):
68 def test_exports(self): 72 def test_exports(self):
69 # There are no surprising symbols in the tempfile module 73 # There are no surprising symbols in the tempfile module
70 dict = tempfile.__dict__ 74 dict = tempfile.__dict__
71 75
72 expected = { 76 expected = {
73 "NamedTemporaryFile" : 1, 77 "NamedTemporaryFile" : 1,
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 # the parent. 146 # the parent.
143 os._exit(0) 147 os._exit(0)
144 parent_value = next(self.r) 148 parent_value = next(self.r)
145 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 149 child_value = os.read(read_fd, len(parent_value)).decode("ascii")
146 finally: 150 finally:
147 if pid: 151 if pid:
148 # best effort to ensure the process can't bleed out 152 # best effort to ensure the process can't bleed out
149 # via any bugs above 153 # via any bugs above
150 try: 154 try:
151 os.kill(pid, signal.SIGKILL) 155 os.kill(pid, signal.SIGKILL)
152 except EnvironmentError: 156 except OSError:
153 pass 157 pass
154 os.close(read_fd) 158 os.close(read_fd)
155 os.close(write_fd) 159 os.close(write_fd)
156 self.assertNotEqual(child_value, parent_value) 160 self.assertNotEqual(child_value, parent_value)
157 161
158 162
159 163
160 class TestCandidateTempdirList(BaseTestCase): 164 class TestCandidateTempdirList(BaseTestCase):
161 """Test the internal function _candidate_tempdir_list.""" 165 """Test the internal function _candidate_tempdir_list."""
162 166
(...skipping 18 matching lines...) Expand all
181 185
182 cand = tempfile._candidate_tempdir_list() 186 cand = tempfile._candidate_tempdir_list()
183 187
184 for envname in 'TMPDIR', 'TEMP', 'TMP': 188 for envname in 'TMPDIR', 'TEMP', 'TMP':
185 dirname = os.getenv(envname) 189 dirname = os.getenv(envname)
186 if not dirname: raise ValueError 190 if not dirname: raise ValueError
187 self.assertIn(dirname, cand) 191 self.assertIn(dirname, cand)
188 192
189 try: 193 try:
190 dirname = os.getcwd() 194 dirname = os.getcwd()
191 except (AttributeError, os.error): 195 except (AttributeError, OSError):
192 dirname = os.curdir 196 dirname = os.curdir
193 197
194 self.assertIn(dirname, cand) 198 self.assertIn(dirname, cand)
195 199
196 # Not practical to try to verify the presence of OS-specific 200 # Not practical to try to verify the presence of OS-specific
197 # paths in this list. 201 # paths in this list.
198 202
199 203
200 # We test _get_default_tempdir by testing gettempdir. 204 # We test _get_default_tempdir some more by testing gettempdir.
205
206 class TestGetDefaultTempdir(BaseTestCase):
207 """Test _get_default_tempdir()."""
208
209 def test_no_files_left_behind(self):
210 # use a private empty directory
211 with tempfile.TemporaryDirectory() as our_temp_directory:
212 # force _get_default_tempdir() to consider our empty directory
213 def our_candidate_list():
214 return [our_temp_directory]
215
216 with support.swap_attr(tempfile, "_candidate_tempdir_list",
217 our_candidate_list):
218 # verify our directory is empty after _get_default_tempdir()
219 tempfile._get_default_tempdir()
220 self.assertEqual(os.listdir(our_temp_directory), [])
221
222 def raise_OSError(*args, **kwargs):
223 raise OSError()
224
225 with support.swap_attr(io, "open", raise_OSError):
226 # test again with failing io.open()
227 with self.assertRaises(FileNotFoundError):
228 tempfile._get_default_tempdir()
229 self.assertEqual(os.listdir(our_temp_directory), [])
230
231 open = io.open
232 def bad_writer(*args, **kwargs):
233 fp = open(*args, **kwargs)
234 fp.write = raise_OSError
235 return fp
236
237 with support.swap_attr(io, "open", bad_writer):
238 # test again with failing write()
239 with self.assertRaises(FileNotFoundError):
240 tempfile._get_default_tempdir()
241 self.assertEqual(os.listdir(our_temp_directory), [])
201 242
202 243
203 class TestGetCandidateNames(BaseTestCase): 244 class TestGetCandidateNames(BaseTestCase):
204 """Test the internal function _get_candidate_names.""" 245 """Test the internal function _get_candidate_names."""
205 246
206 def test_retval(self): 247 def test_retval(self):
207 # _get_candidate_names returns a _RandomNameSequence object 248 # _get_candidate_names returns a _RandomNameSequence object
208 obj = tempfile._get_candidate_names() 249 obj = tempfile._get_candidate_names()
209 self.assertIsInstance(obj, tempfile._RandomNameSequence) 250 self.assertIsInstance(obj, tempfile._RandomNameSequence)
210 251
211 def test_same_thing(self): 252 def test_same_thing(self):
212 # _get_candidate_names always returns the same object 253 # _get_candidate_names always returns the same object
213 a = tempfile._get_candidate_names() 254 a = tempfile._get_candidate_names()
214 b = tempfile._get_candidate_names() 255 b = tempfile._get_candidate_names()
215 256
216 self.assertIs(a, b) 257 self.assertIs(a, b)
258
259
260 @contextlib.contextmanager
261 def _inside_empty_temp_dir():
262 dir = tempfile.mkdtemp()
263 try:
264 with support.swap_attr(tempfile, 'tempdir', dir):
265 yield
266 finally:
267 support.rmtree(dir)
268
269
270 def _mock_candidate_names(*names):
271 return support.swap_attr(tempfile,
272 '_get_candidate_names',
273 lambda: iter(names))
217 274
218 275
219 class TestMkstempInner(BaseTestCase): 276 class TestMkstempInner(BaseTestCase):
220 """Test the internal function _mkstemp_inner.""" 277 """Test the internal function _mkstemp_inner."""
221 278
222 class mkstemped: 279 class mkstemped:
223 _bflags = tempfile._bin_openflags 280 _bflags = tempfile._bin_openflags
224 _tflags = tempfile._text_openflags 281 _tflags = tempfile._text_openflags
225 _close = os.close 282 _close = os.close
226 _unlink = os.unlink 283 _unlink = os.unlink
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
261 extant[i] = self.do_create(pre="aa") 318 extant[i] = self.do_create(pre="aa")
262 319
263 def test_choose_directory(self): 320 def test_choose_directory(self):
264 # _mkstemp_inner can create files in a user-selected directory 321 # _mkstemp_inner can create files in a user-selected directory
265 dir = tempfile.mkdtemp() 322 dir = tempfile.mkdtemp()
266 try: 323 try:
267 self.do_create(dir=dir).write(b"blat") 324 self.do_create(dir=dir).write(b"blat")
268 finally: 325 finally:
269 os.rmdir(dir) 326 os.rmdir(dir)
270 327
328 @unittest.skipUnless(has_stat, 'os.stat not available')
271 def test_file_mode(self): 329 def test_file_mode(self):
272 # _mkstemp_inner creates files with the proper mode 330 # _mkstemp_inner creates files with the proper mode
273 if not has_stat:
274 return # ugh, can't use SkipTest.
275 331
276 file = self.do_create() 332 file = self.do_create()
277 mode = stat.S_IMODE(os.stat(file.name).st_mode) 333 mode = stat.S_IMODE(os.stat(file.name).st_mode)
278 expected = 0o600 334 expected = 0o600
279 if sys.platform == 'win32': 335 if sys.platform == 'win32':
280 # There's no distinction among 'user', 'group' and 'world'; 336 # There's no distinction among 'user', 'group' and 'world';
281 # replicate the 'user' bits. 337 # replicate the 'user' bits.
282 user = expected >> 6 338 user = expected >> 6
283 expected = user * (1 + 8 + 64) 339 expected = user * (1 + 8 + 64)
284 self.assertEqual(mode, expected) 340 self.assertEqual(mode, expected)
285 341
342 @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
286 def test_noinherit(self): 343 def test_noinherit(self):
287 # _mkstemp_inner file handles are not inherited by child processes 344 # _mkstemp_inner file handles are not inherited by child processes
288 if not has_spawnl:
289 return # ugh, can't use SkipTest.
290 345
291 if support.verbose: 346 if support.verbose:
292 v="v" 347 v="v"
293 else: 348 else:
294 v="q" 349 v="q"
295 350
296 file = self.do_create() 351 file = self.do_create()
352 self.assertEqual(os.get_inheritable(file.fd), False)
297 fd = "%d" % file.fd 353 fd = "%d" % file.fd
298 354
299 try: 355 try:
300 me = __file__ 356 me = __file__
301 except NameError: 357 except NameError:
302 me = sys.argv[0] 358 me = sys.argv[0]
303 359
304 # We have to exec something, so that FD_CLOEXEC will take 360 # We have to exec something, so that FD_CLOEXEC will take
305 # effect. The core of this test is therefore in 361 # effect. The core of this test is therefore in
306 # tf_inherit_check.py, which see. 362 # tf_inherit_check.py, which see.
307 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 363 tester = os.path.join(os.path.dirname(os.path.abspath(me)),
308 "tf_inherit_check.py") 364 "tf_inherit_check.py")
309 365
310 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 366 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
311 # but an arg with embedded spaces should be decorated with double 367 # but an arg with embedded spaces should be decorated with double
312 # quotes on each end 368 # quotes on each end
313 if sys.platform == 'win32': 369 if sys.platform == 'win32':
314 decorated = '"%s"' % sys.executable 370 decorated = '"%s"' % sys.executable
315 tester = '"%s"' % tester 371 tester = '"%s"' % tester
316 else: 372 else:
317 decorated = sys.executable 373 decorated = sys.executable
318 374
319 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 375 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
320 self.assertFalse(retval < 0, 376 self.assertFalse(retval < 0,
321 "child process caught fatal signal %d" % -retval) 377 "child process caught fatal signal %d" % -retval)
322 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 378 self.assertFalse(retval > 0, "child process reports failure %d"%retval)
323 379
380 @unittest.skipUnless(has_textmode, "text mode not available")
324 def test_textmode(self): 381 def test_textmode(self):
325 # _mkstemp_inner can create files in text mode 382 # _mkstemp_inner can create files in text mode
326 if not has_textmode:
327 return # ugh, can't use SkipTest.
328 383
329 # A text file is truncated at the first Ctrl+Z byte 384 # A text file is truncated at the first Ctrl+Z byte
330 f = self.do_create(bin=0) 385 f = self.do_create(bin=0)
331 f.write(b"blat\x1a") 386 f.write(b"blat\x1a")
332 f.write(b"extra\n") 387 f.write(b"extra\n")
333 os.lseek(f.fd, 0, os.SEEK_SET) 388 os.lseek(f.fd, 0, os.SEEK_SET)
334 self.assertEqual(os.read(f.fd, 20), b"blat") 389 self.assertEqual(os.read(f.fd, 20), b"blat")
335 390
391 def default_mkstemp_inner(self):
392 return tempfile._mkstemp_inner(tempfile.gettempdir(),
393 tempfile.template,
394 '',
395 tempfile._bin_openflags)
396
397 def test_collision_with_existing_file(self):
398 # _mkstemp_inner tries another name when a file with
399 # the chosen name already exists
400 with _inside_empty_temp_dir(), \
401 _mock_candidate_names('aaa', 'aaa', 'bbb'):
402 (fd1, name1) = self.default_mkstemp_inner()
403 os.close(fd1)
404 self.assertTrue(name1.endswith('aaa'))
405
406 (fd2, name2) = self.default_mkstemp_inner()
407 os.close(fd2)
408 self.assertTrue(name2.endswith('bbb'))
409
410 def test_collision_with_existing_directory(self):
411 # _mkstemp_inner tries another name when a directory with
412 # the chosen name already exists
413 with _inside_empty_temp_dir(), \
414 _mock_candidate_names('aaa', 'aaa', 'bbb'):
415 dir = tempfile.mkdtemp()
416 self.assertTrue(dir.endswith('aaa'))
417
418 (fd, name) = self.default_mkstemp_inner()
419 os.close(fd)
420 self.assertTrue(name.endswith('bbb'))
421
336 422
337 class TestGetTempPrefix(BaseTestCase): 423 class TestGetTempPrefix(BaseTestCase):
338 """Test gettempprefix().""" 424 """Test gettempprefix()."""
339 425
340 def test_sane_template(self): 426 def test_sane_template(self):
341 # gettempprefix returns a nonempty prefix string 427 # gettempprefix returns a nonempty prefix string
342 p = tempfile.gettempprefix() 428 p = tempfile.gettempprefix()
343 429
344 self.assertIsInstance(p, str) 430 self.assertIsInstance(p, str)
345 self.assertGreater(len(p), 0) 431 self.assertGreater(len(p), 0)
(...skipping 15 matching lines...) Expand all
361 os.rmdir(d) 447 os.rmdir(d)
362 448
363 449
364 class TestGetTempDir(BaseTestCase): 450 class TestGetTempDir(BaseTestCase):
365 """Test gettempdir().""" 451 """Test gettempdir()."""
366 452
367 def test_directory_exists(self): 453 def test_directory_exists(self):
368 # gettempdir returns a directory which exists 454 # gettempdir returns a directory which exists
369 455
370 dir = tempfile.gettempdir() 456 dir = tempfile.gettempdir()
371 self.assertTrue(os.path.isabs(dir) or dir == os.curdir, 457 if dir != os.curdir:
372 "%s is not an absolute path" % dir) 458 self.assertTrue(os.path.isabs(dir),
459 "%s is not an absolute path" % dir)
373 self.assertTrue(os.path.isdir(dir), 460 self.assertTrue(os.path.isdir(dir),
374 "%s is not a directory" % dir) 461 "%s is not a directory" % dir)
375 462
376 def test_directory_writable(self): 463 def test_directory_writable(self):
377 # gettempdir returns a directory writable by the user 464 # gettempdir returns a directory writable by the user
378 465
379 # sneaky: just instantiate a NamedTemporaryFile, which 466 # sneaky: just instantiate a NamedTemporaryFile, which
380 # defaults to writing into the directory returned by 467 # defaults to writing into the directory returned by
381 # gettempdir. 468 # gettempdir.
382 file = tempfile.NamedTemporaryFile() 469 file = tempfile.NamedTemporaryFile()
383 file.write(b"blat") 470 file.write(b"blat")
384 file.close() 471 file.close()
385 472
386 def test_same_thing(self): 473 def test_same_thing(self):
387 # gettempdir always returns the same object 474 # gettempdir always returns the same object
388 a = tempfile.gettempdir() 475 a = tempfile.gettempdir()
389 b = tempfile.gettempdir() 476 b = tempfile.gettempdir()
390 477
391 self.assertIs(a, b) 478 self.assertIs(a, b)
479
480 def test_case_sensitive(self):
481 # gettempdir should not flatten its case
482 # even on a case-insensitive file system
483 case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
484 _tempdir, tempfile.tempdir = tempfile.tempdir, None
485 try:
486 with support.EnvironmentVarGuard() as env:
487 # Fake the first env var which is checked as a candidate
488 env["TMPDIR"] = case_sensitive_tempdir
489 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
490 finally:
491 tempfile.tempdir = _tempdir
492 support.rmdir(case_sensitive_tempdir)
392 493
393 494
394 class TestMkstemp(BaseTestCase): 495 class TestMkstemp(BaseTestCase):
395 """Test mkstemp().""" 496 """Test mkstemp()."""
396 497
397 def do_create(self, dir=None, pre="", suf=""): 498 def do_create(self, dir=None, pre="", suf=""):
398 if dir is None: 499 if dir is None:
399 dir = tempfile.gettempdir() 500 dir = tempfile.gettempdir()
400 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 501 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
401 (ndir, nbase) = os.path.split(name) 502 (ndir, nbase) = os.path.split(name)
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
462 os.rmdir(i) 563 os.rmdir(i)
463 564
464 def test_choose_directory(self): 565 def test_choose_directory(self):
465 # mkdtemp can create directories in a user-selected directory 566 # mkdtemp can create directories in a user-selected directory
466 dir = tempfile.mkdtemp() 567 dir = tempfile.mkdtemp()
467 try: 568 try:
468 os.rmdir(self.do_create(dir=dir)) 569 os.rmdir(self.do_create(dir=dir))
469 finally: 570 finally:
470 os.rmdir(dir) 571 os.rmdir(dir)
471 572
573 @unittest.skipUnless(has_stat, 'os.stat not available')
472 def test_mode(self): 574 def test_mode(self):
473 # mkdtemp creates directories with the proper mode 575 # mkdtemp creates directories with the proper mode
474 if not has_stat:
475 return # ugh, can't use SkipTest.
476 576
477 dir = self.do_create() 577 dir = self.do_create()
478 try: 578 try:
479 mode = stat.S_IMODE(os.stat(dir).st_mode) 579 mode = stat.S_IMODE(os.stat(dir).st_mode)
480 mode &= 0o777 # Mask off sticky bits inherited from /tmp 580 mode &= 0o777 # Mask off sticky bits inherited from /tmp
481 expected = 0o700 581 expected = 0o700
482 if sys.platform == 'win32': 582 if sys.platform == 'win32':
483 # There's no distinction among 'user', 'group' and 'world'; 583 # There's no distinction among 'user', 'group' and 'world';
484 # replicate the 'user' bits. 584 # replicate the 'user' bits.
485 user = expected >> 6 585 user = expected >> 6
486 expected = user * (1 + 8 + 64) 586 expected = user * (1 + 8 + 64)
487 self.assertEqual(mode, expected) 587 self.assertEqual(mode, expected)
488 finally: 588 finally:
489 os.rmdir(dir) 589 os.rmdir(dir)
490 590
591 def test_collision_with_existing_file(self):
592 # mkdtemp tries another name when a file with
593 # the chosen name already exists
594 with _inside_empty_temp_dir(), \
595 _mock_candidate_names('aaa', 'aaa', 'bbb'):
596 file = tempfile.NamedTemporaryFile(delete=False)
597 file.close()
598 self.assertTrue(file.name.endswith('aaa'))
599 dir = tempfile.mkdtemp()
600 self.assertTrue(dir.endswith('bbb'))
601
602 def test_collision_with_existing_directory(self):
603 # mkdtemp tries another name when a directory with
604 # the chosen name already exists
605 with _inside_empty_temp_dir(), \
606 _mock_candidate_names('aaa', 'aaa', 'bbb'):
607 dir1 = tempfile.mkdtemp()
608 self.assertTrue(dir1.endswith('aaa'))
609 dir2 = tempfile.mkdtemp()
610 self.assertTrue(dir2.endswith('bbb'))
611
491 612
492 class TestMktemp(BaseTestCase): 613 class TestMktemp(BaseTestCase):
493 """Test mktemp().""" 614 """Test mktemp()."""
494 615
495 # For safety, all use of mktemp must occur in a private directory. 616 # For safety, all use of mktemp must occur in a private directory.
496 # We must also suppress the RuntimeWarning it generates. 617 # We must also suppress the RuntimeWarning it generates.
497 def setUp(self): 618 def setUp(self):
498 self.dir = tempfile.mkdtemp() 619 self.dir = tempfile.mkdtemp()
499 super().setUp() 620 super().setUp()
500 621
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
562 return file 683 return file
563 684
564 685
565 def test_basic(self): 686 def test_basic(self):
566 # NamedTemporaryFile can create files 687 # NamedTemporaryFile can create files
567 self.do_create() 688 self.do_create()
568 self.do_create(pre="a") 689 self.do_create(pre="a")
569 self.do_create(suf="b") 690 self.do_create(suf="b")
570 self.do_create(pre="a", suf="b") 691 self.do_create(pre="a", suf="b")
571 self.do_create(pre="aa", suf=".txt") 692 self.do_create(pre="aa", suf=".txt")
693
694 def test_method_lookup(self):
695 # Issue #18879: Looking up a temporary file method should keep it
696 # alive long enough.
697 f = self.do_create()
698 wr = weakref.ref(f)
699 write = f.write
700 write2 = f.write
701 del f
702 write(b'foo')
703 del write
704 write2(b'bar')
705 del write2
706 if support.check_impl_detail(cpython=True):
707 # No reference cycle was created.
708 self.assertIsNone(wr())
572 709
573 def test_creates_named(self): 710 def test_creates_named(self):
574 # NamedTemporaryFile creates files with names 711 # NamedTemporaryFile creates files with names
575 f = tempfile.NamedTemporaryFile() 712 f = tempfile.NamedTemporaryFile()
576 self.assertTrue(os.path.exists(f.name), 713 self.assertTrue(os.path.exists(f.name),
577 "NamedTemporaryFile %s does not exist" % f.name) 714 "NamedTemporaryFile %s does not exist" % f.name)
578 715
579 def test_del_on_close(self): 716 def test_del_on_close(self):
580 # A NamedTemporaryFile is deleted when closed 717 # A NamedTemporaryFile is deleted when closed
581 dir = tempfile.mkdtemp() 718 dir = tempfile.mkdtemp()
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after
737 # methods should continue to function 874 # methods should continue to function
738 f = self.do_create(max_size=30) 875 f = self.do_create(max_size=30)
739 read = f.read 876 read = f.read
740 write = f.write 877 write = f.write
741 seek = f.seek 878 seek = f.seek
742 879
743 write(b"a" * 35) 880 write(b"a" * 35)
744 write(b"b" * 35) 881 write(b"b" * 35)
745 seek(0, 0) 882 seek(0, 0)
746 self.assertEqual(read(70), b'a'*35 + b'b'*35) 883 self.assertEqual(read(70), b'a'*35 + b'b'*35)
884
885 def test_properties(self):
886 f = tempfile.SpooledTemporaryFile(max_size=10)
887 f.write(b'x' * 10)
888 self.assertFalse(f._rolled)
889 self.assertEqual(f.mode, 'w+b')
890 self.assertIsNone(f.name)
891 with self.assertRaises(AttributeError):
892 f.newlines
893 with self.assertRaises(AttributeError):
894 f.encoding
895
896 f.write(b'x')
897 self.assertTrue(f._rolled)
898 self.assertEqual(f.mode, 'rb+')
899 self.assertIsNotNone(f.name)
900 with self.assertRaises(AttributeError):
901 f.newlines
902 with self.assertRaises(AttributeError):
903 f.encoding
747 904
748 def test_text_mode(self): 905 def test_text_mode(self):
749 # Creating a SpooledTemporaryFile with a text mode should produce 906 # Creating a SpooledTemporaryFile with a text mode should produce
750 # a file object reading and writing (Unicode) text strings. 907 # a file object reading and writing (Unicode) text strings.
751 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) 908 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
752 f.write("abc\n") 909 f.write("abc\n")
753 f.seek(0) 910 f.seek(0)
754 self.assertEqual(f.read(), "abc\n") 911 self.assertEqual(f.read(), "abc\n")
755 f.write("def\n") 912 f.write("def\n")
756 f.seek(0) 913 f.seek(0)
757 self.assertEqual(f.read(), "abc\ndef\n") 914 self.assertEqual(f.read(), "abc\ndef\n")
915 self.assertFalse(f._rolled)
916 self.assertEqual(f.mode, 'w+')
917 self.assertIsNone(f.name)
918 self.assertIsNone(f.newlines)
919 self.assertIsNone(f.encoding)
920
758 f.write("xyzzy\n") 921 f.write("xyzzy\n")
759 f.seek(0) 922 f.seek(0)
760 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 923 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
761 # Check that Ctrl+Z doesn't truncate the file 924 # Check that Ctrl+Z doesn't truncate the file
762 f.write("foo\x1abar\n") 925 f.write("foo\x1abar\n")
763 f.seek(0) 926 f.seek(0)
764 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 927 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
928 self.assertTrue(f._rolled)
929 self.assertEqual(f.mode, 'w+')
930 self.assertIsNotNone(f.name)
931 self.assertEqual(f.newlines, os.linesep)
932 self.assertIsNotNone(f.encoding)
765 933
766 def test_text_newline_and_encoding(self): 934 def test_text_newline_and_encoding(self):
767 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 935 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
768 newline='', encoding='utf-8') 936 newline='', encoding='utf-8')
769 f.write("\u039B\r\n") 937 f.write("\u039B\r\n")
770 f.seek(0) 938 f.seek(0)
771 self.assertEqual(f.read(), "\u039B\r\n") 939 self.assertEqual(f.read(), "\u039B\r\n")
772 self.assertFalse(f._rolled) 940 self.assertFalse(f._rolled)
941 self.assertEqual(f.mode, 'w+')
942 self.assertIsNone(f.name)
943 self.assertIsNone(f.newlines)
944 self.assertIsNone(f.encoding)
773 945
774 f.write("\u039B" * 20 + "\r\n") 946 f.write("\u039B" * 20 + "\r\n")
775 f.seek(0) 947 f.seek(0)
776 self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") 948 self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
777 self.assertTrue(f._rolled) 949 self.assertTrue(f._rolled)
950 self.assertEqual(f.mode, 'w+')
951 self.assertIsNotNone(f.name)
952 self.assertIsNotNone(f.newlines)
953 self.assertEqual(f.encoding, 'utf-8')
778 954
779 def test_context_manager_before_rollover(self): 955 def test_context_manager_before_rollover(self):
780 # A SpooledTemporaryFile can be used as a context manager 956 # A SpooledTemporaryFile can be used as a context manager
781 with tempfile.SpooledTemporaryFile(max_size=1) as f: 957 with tempfile.SpooledTemporaryFile(max_size=1) as f:
782 self.assertFalse(f._rolled) 958 self.assertFalse(f._rolled)
783 self.assertFalse(f.closed) 959 self.assertFalse(f.closed)
784 self.assertTrue(f.closed) 960 self.assertTrue(f.closed)
785 def use_closed(): 961 def use_closed():
786 with f: 962 with f:
787 pass 963 pass
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after
906 class TestTemporaryDirectory(BaseTestCase): 1082 class TestTemporaryDirectory(BaseTestCase):
907 """Test TemporaryDirectory().""" 1083 """Test TemporaryDirectory()."""
908 1084
909 def do_create(self, dir=None, pre="", suf="", recurse=1): 1085 def do_create(self, dir=None, pre="", suf="", recurse=1):
910 if dir is None: 1086 if dir is None:
911 dir = tempfile.gettempdir() 1087 dir = tempfile.gettempdir()
912 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1088 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
913 self.nameCheck(tmp.name, dir, pre, suf) 1089 self.nameCheck(tmp.name, dir, pre, suf)
914 # Create a subdirectory and some files 1090 # Create a subdirectory and some files
915 if recurse: 1091 if recurse:
916 self.do_create(tmp.name, pre, suf, recurse-1) 1092 d1 = self.do_create(tmp.name, pre, suf, recurse-1)
1093 d1.name = None
917 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1094 with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
918 f.write(b"Hello world!") 1095 f.write(b"Hello world!")
919 return tmp 1096 return tmp
920 1097
921 def test_mkdtemp_failure(self): 1098 def test_mkdtemp_failure(self):
922 # Check no additional exception if mkdtemp fails 1099 # Check no additional exception if mkdtemp fails
923 # Previously would raise AttributeError instead 1100 # Previously would raise AttributeError instead
924 # (noted as part of Issue #10188) 1101 # (noted as part of Issue #10188)
925 with tempfile.TemporaryDirectory() as nonexistent: 1102 with tempfile.TemporaryDirectory() as nonexistent:
926 pass 1103 pass
927 with self.assertRaises(os.error): 1104 with self.assertRaises(FileNotFoundError) as cm:
928 tempfile.TemporaryDirectory(dir=nonexistent) 1105 tempfile.TemporaryDirectory(dir=nonexistent)
1106 self.assertEqual(cm.exception.errno, errno.ENOENT)
929 1107
930 def test_explicit_cleanup(self): 1108 def test_explicit_cleanup(self):
931 # A TemporaryDirectory is deleted when cleaned up 1109 # A TemporaryDirectory is deleted when cleaned up
932 dir = tempfile.mkdtemp() 1110 dir = tempfile.mkdtemp()
933 try: 1111 try:
934 d = self.do_create(dir=dir) 1112 d = self.do_create(dir=dir)
935 self.assertTrue(os.path.exists(d.name), 1113 self.assertTrue(os.path.exists(d.name),
936 "TemporaryDirectory %s does not exist" % d.name) 1114 "TemporaryDirectory %s does not exist" % d.name)
937 d.cleanup() 1115 d.cleanup()
938 self.assertFalse(os.path.exists(d.name), 1116 self.assertFalse(os.path.exists(d.name),
939 "TemporaryDirectory %s exists after cleanup" % d.name) 1117 "TemporaryDirectory %s exists after cleanup" % d.name)
940 finally: 1118 finally:
941 os.rmdir(dir) 1119 os.rmdir(dir)
942 1120
943 @support.skip_unless_symlink 1121 @support.skip_unless_symlink
944 def test_cleanup_with_symlink_to_a_directory(self): 1122 def test_cleanup_with_symlink_to_a_directory(self):
945 # cleanup() should not follow symlinks to directories (issue #12464) 1123 # cleanup() should not follow symlinks to directories (issue #12464)
946 d1 = self.do_create() 1124 d1 = self.do_create()
947 d2 = self.do_create() 1125 d2 = self.do_create(recurse=0)
948 1126
949 # Symlink d1/foo -> d2 1127 # Symlink d1/foo -> d2
950 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1128 os.symlink(d2.name, os.path.join(d1.name, "foo"))
951 1129
952 # This call to cleanup() should not follow the "foo" symlink 1130 # This call to cleanup() should not follow the "foo" symlink
953 d1.cleanup() 1131 d1.cleanup()
954 1132
955 self.assertFalse(os.path.exists(d1.name), 1133 self.assertFalse(os.path.exists(d1.name),
956 "TemporaryDirectory %s exists after cleanup" % d1.name) 1134 "TemporaryDirectory %s exists after cleanup" % d1.name)
957 self.assertTrue(os.path.exists(d2.name), 1135 self.assertTrue(os.path.exists(d2.name),
958 "Directory pointed to by a symlink was deleted") 1136 "Directory pointed to by a symlink was deleted")
959 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1137 self.assertEqual(os.listdir(d2.name), ['test.txt'],
960 "Contents of the directory pointed to by a symlink " 1138 "Contents of the directory pointed to by a symlink "
961 "were deleted") 1139 "were deleted")
962 d2.cleanup() 1140 d2.cleanup()
963 1141
964 @support.cpython_only 1142 @support.cpython_only
965 def test_del_on_collection(self): 1143 def test_del_on_collection(self):
966 # A TemporaryDirectory is deleted when garbage collected 1144 # A TemporaryDirectory is deleted when garbage collected
967 dir = tempfile.mkdtemp() 1145 dir = tempfile.mkdtemp()
968 try: 1146 try:
969 d = self.do_create(dir=dir) 1147 d = self.do_create(dir=dir)
970 name = d.name 1148 name = d.name
971 del d # Rely on refcounting to invoke __del__ 1149 del d # Rely on refcounting to invoke __del__
972 self.assertFalse(os.path.exists(name), 1150 self.assertFalse(os.path.exists(name),
973 "TemporaryDirectory %s exists after __del__" % name) 1151 "TemporaryDirectory %s exists after __del__" % name)
974 finally: 1152 finally:
975 os.rmdir(dir) 1153 os.rmdir(dir)
976 1154
977 @unittest.expectedFailure # See issue #10188
978 def test_del_on_shutdown(self): 1155 def test_del_on_shutdown(self):
979 # A TemporaryDirectory may be cleaned up during shutdown 1156 # A TemporaryDirectory may be cleaned up during shutdown
980 # Make sure it works with the relevant modules nulled out
981 with self.do_create() as dir: 1157 with self.do_create() as dir:
982 d = self.do_create(dir=dir) 1158 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings '):
983 # Mimic the nulling out of modules that 1159 code = """if True:
984 # occurs during system shutdown 1160 import builtins
985 modules = [os, os.path] 1161 import os
986 if has_stat: 1162 import shutil
987 modules.append(stat) 1163 import sys
988 # Currently broken, so suppress the warning 1164 import tempfile
989 # that is otherwise emitted on stdout 1165 import warnings
990 with support.captured_stderr() as err: 1166
991 with NulledModules(*modules): 1167 tmp = tempfile.TemporaryDirectory(dir={dir!r})
992 d.cleanup() 1168 sys.stdout.buffer.write(tmp.name.encode())
993 # Currently broken, so stop spurious exception by 1169
994 # indicating the object has already been closed 1170 tmp2 = os.path.join(tmp.name, 'test_dir')
995 d._closed = True 1171 os.mkdir(tmp2)
996 # And this assert will fail, as expected by the 1172 with open(os.path.join(tmp2, "test.txt"), "w") as f:
997 # unittest decorator... 1173 f.write("Hello world!")
998 self.assertFalse(os.path.exists(d.name), 1174
999 "TemporaryDirectory %s exists after cleanup" % d.name) 1175 {mod}.tmp = tmp
1176
1177 warnings.filterwarnings("always", category=ResourceWarning)
1178 """.format(dir=dir, mod=mod)
1179 rc, out, err = script_helper.assert_python_ok("-c", code)
1180 tmp_name = out.decode().strip()
1181 self.assertFalse(os.path.exists(tmp_name),
1182 "TemporaryDirectory %s exists after cleanup" % tmp_n ame)
1183 err = err.decode('utf-8', 'backslashreplace')
1184 self.assertNotIn("Exception ", err)
1185 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1000 1186
1001 def test_warnings_on_cleanup(self): 1187 def test_warnings_on_cleanup(self):
1002 # Two kinds of warning on shutdown 1188 # ResourceWarning will be triggered by __del__
1003 # Issue 10888: may write to stderr if modules are nulled out
1004 # ResourceWarning will be triggered by __del__
1005 with self.do_create() as dir: 1189 with self.do_create() as dir:
1006 if os.sep != '\\': 1190 d = self.do_create(dir=dir, recurse=3)
1007 # Embed a backslash in order to make sure string escaping 1191 name = d.name
1008 # in the displayed error message is dealt with correctly
1009 suffix = '\\check_backslash_handling'
1010 else:
1011 suffix = ''
1012 d = self.do_create(dir=dir, suf=suffix)
1013
1014 #Check for the Issue 10888 message
1015 modules = [os, os.path]
1016 if has_stat:
1017 modules.append(stat)
1018 with support.captured_stderr() as err:
1019 with NulledModules(*modules):
1020 d.cleanup()
1021 message = err.getvalue().replace('\\\\', '\\')
1022 self.assertIn("while cleaning up", message)
1023 self.assertIn(d.name, message)
1024 1192
1025 # Check for the resource warning 1193 # Check for the resource warning
1026 with support.check_warnings(('Implicitly', ResourceWarning), quiet=F alse): 1194 with support.check_warnings(('Implicitly', ResourceWarning), quiet=F alse):
1027 warnings.filterwarnings("always", category=ResourceWarning) 1195 warnings.filterwarnings("always", category=ResourceWarning)
1028 d.__del__() 1196 del d
1029 self.assertFalse(os.path.exists(d.name), 1197 support.gc_collect()
1030 "TemporaryDirectory %s exists after __del__" % d.name) 1198 self.assertFalse(os.path.exists(name),
1199 "TemporaryDirectory %s exists after __del__" % name)
1031 1200
1032 def test_multiple_close(self): 1201 def test_multiple_close(self):
1033 # Can be cleaned-up many times without error 1202 # Can be cleaned-up many times without error
1034 d = self.do_create() 1203 d = self.do_create()
1035 d.cleanup() 1204 d.cleanup()
1036 d.cleanup() 1205 d.cleanup()
1037 d.cleanup() 1206 d.cleanup()
1038 1207
1039 def test_context_manager(self): 1208 def test_context_manager(self):
1040 # Can be used as a context manager 1209 # Can be used as a context manager
1041 d = self.do_create() 1210 d = self.do_create()
1042 with d as name: 1211 with d as name:
1043 self.assertTrue(os.path.exists(name)) 1212 self.assertTrue(os.path.exists(name))
1044 self.assertEqual(name, d.name) 1213 self.assertEqual(name, d.name)
1045 self.assertFalse(os.path.exists(name)) 1214 self.assertFalse(os.path.exists(name))
1046 1215
1047 1216
1048 def test_main(): 1217 def test_main():
1049 support.run_unittest(__name__) 1218 support.run_unittest(__name__)
1050 1219
1051 if __name__ == "__main__": 1220 if __name__ == "__main__":
1052 test_main() 1221 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+