diff -r e74f4d6c0c81 Doc/library/shutil.rst --- a/Doc/library/shutil.rst Wed May 15 16:18:51 2013 -0500 +++ b/Doc/library/shutil.rst Thu May 16 01:07:44 2013 +0200 @@ -36,6 +36,16 @@ copying and removal. For operations on i Directory and files operations ------------------------------ +.. function:: atomic_write(filename, mode='w+b', buffering=-1, encoding=None, newline=None) + + Atomic context manager to write into a file. + + If the file already exists, it is replaced, otherwise it is created. + On error, the original file is leaved unchanged. + + .. versionadded:: 3.4 + + .. function:: copyfileobj(fsrc, fdst[, length]) Copy the contents of the file-like object *fsrc* to the file-like object *fdst*. diff -r e74f4d6c0c81 Lib/shutil.py --- a/Lib/shutil.py Wed May 15 16:18:51 2013 -0500 +++ b/Lib/shutil.py Thu May 16 01:07:44 2013 +0200 @@ -4,14 +4,16 @@ XXX The functions here don't copy the re """ +import collections +import contextlib +import errno +import fnmatch import os +import stat import sys -import stat +import tarfile +import tempfile from os.path import abspath -import fnmatch -import collections -import errno -import tarfile try: import bz2 @@ -1126,3 +1128,39 @@ def which(cmd, mode=os.F_OK | os.X_OK, p if _access_check(name, mode): return name return None + +@contextlib.contextmanager +def atomic_write(filename, mode='w+b', buffering=-1, + encoding=None, newline=None): + """Atomic context manager to write into a file. + + If the file already exists, it is replaced, otherwise it is created. + On error, the original file is leaved unchanged. + """ + directory = os.path.dirname(filename) + + # Create a temporary file to leave the original file unchanged on error + fileobj = tempfile.NamedTemporaryFile(mode=mode, buffering=buffering, + encoding=encoding, newline=newline, + dir=directory) + + # The temporary file is removed on error + with fileobj: + # Pass the file object to the caller, + # the caller write data into the file + yield fileobj + + # Flush Python buffers to system buffers + fileobj.flush() + + if hasattr(os, 'fsync'): + # Flush system buffers to disk + fd = fileobj.fileno() + os.fsync(fd) + + # Rename the file to the final name + os.replace(fileobj.name, filename) + + # The temporary file does not exist anymore + fileobj.delete = False + diff -r e74f4d6c0c81 Lib/test/test_shutil.py --- a/Lib/test/test_shutil.py Wed May 15 16:18:51 2013 -0500 +++ b/Lib/test/test_shutil.py Thu May 16 01:07:44 2013 +0200 @@ -1,15 +1,16 @@ # Copyright (C) 2003 Python Software Foundation -import unittest -import shutil -import tempfile -import sys -import stat +import errno +import functools import os import os.path -import errno -import functools +import shutil +import stat import subprocess +import sys +import tempfile +import unittest +import unittest.mock from test import support from test.support import TESTFN from os.path import splitdrive @@ -1715,9 +1716,60 @@ class TermsizeTests(unittest.TestCase): self.assertEqual(expected, actual) +class TestAtomicWrite(unittest.TestCase): + def test_write_binary(self): + self.addCleanup(support.unlink, TESTFN) + + with shutil.atomic_write(TESTFN) as fp: + fp.write(b"test") + + def check_atomic_write(self, write): + # original file does not exist + self.assertFalse(os.path.exists(TESTFN)) + with self.assertRaises(OSError): + with shutil.atomic_write(TESTFN) as fp: + write(fp) + self.assertFalse(os.path.exists(TESTFN)) + + # original file does exist + content = b"original" + with open(TESTFN, "wb") as fp: + fp.write(content) + with self.assertRaises(OSError): + with shutil.atomic_write(TESTFN) as fp: + write(fp) + with open(TESTFN, "rb") as fp: + self.assertEqual(fp.read(), content) + + def test_error_before_replace(self): + self.addCleanup(support.unlink, TESTFN) + + def write(fp): + raise OSError() + + self.check_atomic_write(write) + + def test_error_on_replace(self): + def failing_replace(oldname, newname): + raise OSError() + + def write(fp): + fp.write(b'test') + + self.addCleanup(support.unlink, TESTFN) + + with unittest.mock.patch('shutil.os.replace', failing_replace): + self.check_atomic_write(write) + + def test_main(): - support.run_unittest(TestShutil, TestMove, TestCopyFile, - TermsizeTests, TestWhich) + support.run_unittest( + TestShutil, + TestMove, + TestCopyFile, + TermsizeTests, + TestWhich, + TestAtomicWrite) if __name__ == '__main__': test_main()