Index: Misc/NEWS =================================================================== --- Misc/NEWS (revision 69412) +++ Misc/NEWS (working copy) @@ -413,6 +413,9 @@ - Issue #999042: The Python compiler now handles explict global statements correctly (should be assigned using STORE_GLOBAL opcode). +- Issue #5178: Added tempfile.TemporaryDirectory class that can be used + as a context manager. + Tools/Demos ----------- Index: Doc/library/tempfile.rst =================================================================== --- Doc/library/tempfile.rst (revision 69412) +++ Doc/library/tempfile.rst (working copy) @@ -29,7 +29,7 @@ To maintain backward compatibility, the argument order is somewhat odd; it is recommended to use keyword arguments for clarity. -The module defines the following user-callable functions: +The module defines the following user-callable items: .. function:: TemporaryFile([mode='w+b'[, bufsize=-1[, suffix=''[, prefix='tmp'[, dir=None]]]]]) @@ -153,6 +153,18 @@ .. versionadded:: 2.3 +.. class:: TemporaryDirectory([suffix=''[, prefix='tmp'[, dir=None]]]) + + Creates a temporary directory using :func:`mkdtemp`. The class can + used as a context manager (see :ref:`context-managers`). On exiting + of the context, the temporary directory and all its contents are + removed. The directory can be manually be cleaned up by calling the + :func:`cleanup` method. The directory name can be retrieved from the + :attr:`name` member of the object. + + .. versionadded:: 2.7 + + .. function:: mktemp([suffix=''[, prefix='tmp'[, dir=None]]]) .. deprecated:: 2.3 @@ -250,3 +262,36 @@ .. versionadded:: 1.5.2 + +Examples +-------- + +Here are some examples of typical usage of the :mod:`tempfile` module:: + + >>> import tempfile + + # create a temporary file and write some data to it + >>> fp = tempfile.TemporaryFile() + >>> fp.write('Hello world!') + # read data from file + >>> fp.seek(0) + >>> fp.read() + 'Hello world!' + # close the file, it will be removed + >>> fp.close() + + # create a temporary file using a context manager + >>> with tempfile.TemporaryFile() as fp: + ... fp.write('Hello world!') + ... fp.seek(0) + ... fp.read() + 'Hello world!' + >>> + # file is now closed and removed + + # create a temporary directory using the context manager + >>> with tempfile.TemporaryDirectory() as tmpdirname: + ... print 'created temporary directory', tmpdirname + >>> + # directory and contents have been removed + Index: Lib/tempfile.py =================================================================== --- Lib/tempfile.py (revision 69412) +++ Lib/tempfile.py (working copy) @@ -601,3 +601,59 @@ def xreadlines(self, *args): return self._file.xreadlines(*args) + + +class TemporaryDirectory(object): + """Create and return a temporary directory. This has the same + behavior as mkdtemp but can be used as a context manager. For + example: + + with TemporaryDirectory() as tmpdir: + ... + + Upon exiting the context, the directory and everthing contained + in it are removed. + """ + + def __init__(self, suffix="", prefix=template, dir=None): + self.name = mkdtemp(suffix, prefix, dir) + self._closed = False + + def __enter__(self): + return self.name + + def cleanup(self): + if not self._closed: + self._rmtree(self.name) + self._closed = True + + def __exit__(self, exc, value, tb): + self.cleanup() + + _listdir = _os.listdir + _path_join = _os.path.join + _isdir = _os.path.isdir + _remove = _os.remove + _rmdir = _os.rmdir + _os_error = _os.error + + def _rmtree(self, path): + # Essentially a stripped down version of shutil.rmtree. We can't + # use globals because they may be None'ed out at shutdown. + for name in self._listdir(path): + fullname = self._path_join(path, name) + try: + isdir = self._isdir(fullname) + except self._os_error: + isdir = False + if isdir: + self._rmtree(fullname) + else: + try: + self._remove(fullname) + except self._os_error: + pass + try: + self._rmdir(path) + except self._os_error: + pass Index: Lib/test/test_bsddb185.py =================================================================== --- Lib/test/test_bsddb185.py (revision 69412) +++ Lib/test/test_bsddb185.py (working copy) @@ -11,7 +11,6 @@ import whichdb import os import tempfile -import shutil class Bsddb185Tests(unittest.TestCase): @@ -27,14 +26,11 @@ def test_anydbm_create(self): # Verify that anydbm.open does *not* create a bsddb185 file - tmpdir = tempfile.mkdtemp() - try: + with tempfile.TemporaryDirectory() as tmpdir: dbfile = os.path.join(tmpdir, "foo.db") anydbm.open(dbfile, "c").close() ftype = whichdb.whichdb(dbfile) self.assertNotEqual(ftype, "bsddb185") - finally: - shutil.rmtree(tmpdir) def test_main(): run_unittest(Bsddb185Tests) Index: Lib/test/test_bsddb3.py =================================================================== --- Lib/test/test_bsddb3.py (revision 69412) +++ Lib/test/test_bsddb3.py (working copy) @@ -7,7 +7,7 @@ import tempfile import time import unittest -from test.test_support import requires, verbose, run_unittest, unlink, rmtree +from test.test_support import requires, verbose, run_unittest # When running as a script instead of within the regrtest framework, skip the # requires test, since it's obvious we want to run them. Index: Lib/test/test_shelve.py =================================================================== --- Lib/test/test_shelve.py (revision 69412) +++ Lib/test/test_shelve.py (working copy) @@ -2,6 +2,7 @@ import unittest import shelve import glob +import tempfile from test import test_support class TestCase(unittest.TestCase): @@ -24,34 +25,25 @@ self.fail('Closed shelf should not find a key') def test_ascii_file_shelf(self): - try: - s = shelve.open(self.fn, protocol=0) + with tempfile.TemporaryDirectory() as tmpdir: + s = shelve.open(os.path.join(tmpdir, self.fn), protocol=0) s['key1'] = (1,2,3,4) self.assertEqual(s['key1'], (1,2,3,4)) s.close() - finally: - for f in glob.glob(self.fn+"*"): - os.unlink(f) def test_binary_file_shelf(self): - try: - s = shelve.open(self.fn, protocol=1) + with tempfile.TemporaryDirectory() as tmpdir: + s = shelve.open(os.path.join(tmpdir, self.fn), protocol=1) s['key1'] = (1,2,3,4) self.assertEqual(s['key1'], (1,2,3,4)) s.close() - finally: - for f in glob.glob(self.fn+"*"): - os.unlink(f) def test_proto2_file_shelf(self): - try: - s = shelve.open(self.fn, protocol=2) + with tempfile.TemporaryDirectory() as tmpdir: + s = shelve.open(os.path.join(tmpdir, self.fn), protocol=2) s['key1'] = (1,2,3,4) self.assertEqual(s['key1'], (1,2,3,4)) s.close() - finally: - for f in glob.glob(self.fn+"*"): - os.unlink(f) def test_in_memory_shelf(self): d1 = {} @@ -93,9 +85,9 @@ class TestShelveBase(mapping_tests.BasicTestMappingProtocol): fn = "shelftemp.db" - counter = 0 def __init__(self, *args, **kw): self._db = [] + self._dirnames = [] mapping_tests.BasicTestMappingProtocol.__init__(self, *args, **kw) type2test = shelve.Shelf def _reference(self): @@ -104,17 +96,18 @@ if self._in_mem: x= shelve.Shelf({}, **self._args) else: - self.counter+=1 - x= shelve.open(self.fn+str(self.counter), **self._args) + tmpdir = tempfile.mkdtemp() + x= shelve.open(os.path.join(tmpdir, self.fn), **self._args) + self._dirnames.append(tmpdir) self._db.append(x) return x def tearDown(self): for db in self._db: db.close() self._db = [] - if not self._in_mem: - for f in glob.glob(self.fn+"*"): - test_support.unlink(f) + for tmpdir in self._dirnames: + test_support.rmtree(tmpdir) + self._dirnames = [] class TestAsciiFileShelve(TestShelveBase): _args={'protocol':0} Index: Lib/test/test_commands.py =================================================================== --- Lib/test/test_commands.py (revision 69412) +++ Lib/test/test_commands.py (working copy) @@ -28,19 +28,13 @@ self.assertEquals(getoutput('echo xyzzy'), 'xyzzy') self.assertEquals(getstatusoutput('echo xyzzy'), (0, 'xyzzy')) - # we use mkdtemp in the next line to create an empty directory - # under our exclusive control; from that, we can invent a pathname - # that we _know_ won't exist. This is guaranteed to fail. - dir = None - try: - dir = tempfile.mkdtemp() - name = os.path.join(dir, "foo") - + # we make an empty directory under our exclusive control; + # from that, we can invent a pathname that we _know_ won't + # exist. This is guaranteed to fail. + with tempfile.TemporaryDirectory() as tmpdir: + name = os.path.join(tmpdir, "foo") status, output = getstatusoutput('cat ' + name) self.assertNotEquals(status, 0) - finally: - if dir is not None: - os.rmdir(dir) def test_getstatus(self): # This pattern should match 'ls -ld /.' on any posix Index: Lib/test/test_tempfile.py =================================================================== --- Lib/test/test_tempfile.py (revision 69412) +++ Lib/test/test_tempfile.py (working copy) @@ -81,7 +81,8 @@ "gettempdir" : 1, "tempdir" : 1, "template" : 1, - "SpooledTemporaryFile" : 1 + "SpooledTemporaryFile" : 1, + "TemporaryDirectory" : 1, } unexp = [] @@ -841,6 +842,55 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: test_classes.append(test_TemporaryFile) + +class test_TemporaryDirectory(TC): + """Test TemporaryDirectory().""" + + def do_create(self, dir=None, pre="", suf=""): + if dir is None: + dir = tempfile.gettempdir() + try: + tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("TemporaryDirectory") + self.nameCheck(tmp.name, dir, pre, suf) + return tmp + + def test_creates_named(self): + d = tempfile.TemporaryDirectory() + self.failUnless(os.path.exists(d.name), + "TemporaryDirectory %s does not exist" % d.name) + + def test_del_on_close(self): + # A TemporaryDirectory is deleted when closed + dir = tempfile.mkdtemp() + try: + d = tempfile.TemporaryDirectory(dir=dir) + d.cleanup() + self.failIf(os.path.exists(d.name), + "TemporaryDirectory %s exists after cleanup" % d.name) + finally: + os.rmdir(dir) + + def test_multiple_close(self): + # Can be cleaned-up many times without error + d = tempfile.TemporaryDirectory() + d.cleanup() + try: + d.cleanup() + d.cleanup() + except: + self.failOnException("cleanup") + + def test_context_manager(self): + # Can be used as a context manager + with tempfile.TemporaryDirectory() as name: + self.failUnless(os.path.exists(name)) + self.failIf(os.path.exists(name)) + + +test_classes.append(test_TemporaryDirectory) + def test_main(): test_support.run_unittest(*test_classes)