Index: Doc/lib/libfileinput.tex =================================================================== --- Doc/lib/libfileinput.tex (revision 52964) +++ Doc/lib/libfileinput.tex (working copy) @@ -52,7 +52,7 @@ The following function is the primary interface of this module: \begin{funcdesc}{input}{\optional{files\optional{, inplace\optional{, - backup\optional{, mode\optional{, openhook}}}}}} + backup\optional{, mode\optional{, openhook\optional{, write_mode}}}}}}} Create an instance of the \class{FileInput} class. The instance will be used as global state for the functions of this module, and is also returned to use during iteration. The parameters to this @@ -123,7 +123,7 @@ \begin{classdesc}{FileInput}{\optional{files\optional{, inplace\optional{, backup\optional{, - mode\optional{, openhook}}}}}} + mode\optional{, openhook\optional{, write_mode}}}}}}} Class \class{FileInput} is the implementation; its methods \method{filename()}, \method{fileno()}, \method{lineno()}, \method{fileline()}, \method{isfirstline()}, \method{isstdin()}, @@ -139,6 +139,10 @@ \function{open()}. It must be one of \code{'r'}, \code{'rU'}, \code{'U'} and \code{'rb'}. + With \var{write_mode} you can specify the mode which is passed to the + \function{open()} for the input file when writing in in-place mode. It must + be either \code{'w'} (the default) or \code{'wb'}. + The \var{openhook}, when given, must be a function that takes two arguments, \var{filename} and \var{mode}, and returns an accordingly opened file-like object. @@ -156,8 +160,9 @@ in place. If the keyword argument \code{\var{backup}='.'} is also given, it specifies the extension for the backup file, and the backup file remains around; by default, the extension is -\code{'.bak'} and it is deleted when the output file is closed. In-place -filtering is disabled when standard input is read. +\code{'.bak'} and it is deleted when the output file is closed. The write +mode for in-place filtering can be set using the \code{\var{write_mode}} +keyword argument. In-place filtering is disabled when standard input is read. \strong{Caveat:} The current implementation does not work for MS-DOS 8+3 filesystems. Index: Lib/fileinput.py =================================================================== --- Lib/fileinput.py (revision 52964) +++ Lib/fileinput.py (working copy) @@ -60,9 +60,11 @@ in place. If the keyword argument backup="." is also given, it specifies the extension for the backup file, and the backup file remains around; by default, the extension is ".bak" and it is -deleted when the output file is closed. In-place filtering is -disabled when standard input is read. XXX The current implementation -does not work for MS-DOS 8+3 filesystems. +deleted when the output file is closed. The input file is by default +written to in 'w' mode, this can be changed using the keyword argument +write_mode="" where mode must be one of 'w' or 'wb'. In-place +filtering is disabled when standard input is read. XXX The current +implementation does not work for MS-DOS 8+3 filesystems. Performance: this module is unfortunately one of the slower ways of processing large numbers of input lines. Nevertheless, a significant @@ -89,8 +91,8 @@ DEFAULT_BUFSIZE = 8*1024 def input(files=None, inplace=0, backup="", bufsize=0, - mode="r", openhook=None): - """input([files[, inplace[, backup[, mode[, openhook]]]]]) + mode="r", openhook=None, write_mode='w'): + """input([files[, inplace[, backup[, mode[, openhook[, write_mode]]]]]]) Create an instance of the FileInput class. The instance will be used as global state for the functions of this module, and is also returned @@ -100,7 +102,7 @@ global _state if _state and _state._file: raise RuntimeError, "input() already active" - _state = FileInput(files, inplace, backup, bufsize, mode, openhook) + _state = FileInput(files, inplace, backup, bufsize, mode, openhook, write_mode) return _state def close(): @@ -182,7 +184,7 @@ return _state.isstdin() class FileInput: - """class FileInput([files[, inplace[, backup[, mode[, openhook]]]]]) + """class FileInput([files[, inplace[, backup[, mode[, openhook[, write_mode]]]]]]) Class FileInput is the implementation of the module; its methods filename(), lineno(), fileline(), isfirstline(), isstdin(), fileno(), @@ -195,7 +197,7 @@ """ def __init__(self, files=None, inplace=0, backup="", bufsize=0, - mode="r", openhook=None): + mode="r", openhook=None, write_mode="w"): if isinstance(files, basestring): files = (files,) else: @@ -224,6 +226,12 @@ raise ValueError("FileInput opening mode must be one of " "'r', 'rU', 'U' and 'rb'") self._mode = mode + # restrict write_mode argument to writing modes + if inplace and write_mode not in ('w', 'wb'): + raise ValueError("FileInput write mode must be one of " + "'w', 'wb'") + self._write_mode = write_mode + if inplace and openhook: raise ValueError("FileInput cannot use an opening hook in inplace mode") elif openhook and not callable(openhook): @@ -324,12 +332,15 @@ try: perm = os.fstat(self._file.fileno()).st_mode except OSError: - self._output = open(self._filename, "w") + self._output = open(self._filename, self._write_mode) else: + write_flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC + if "b" in self._write_mode: + write_flags = write_flags | os.O_BINARY fd = os.open(self._filename, - os.O_CREAT | os.O_WRONLY | os.O_TRUNC, + write_flags, perm) - self._output = os.fdopen(fd, "w") + self._output = os.fdopen(fd, self._write_mode) try: if hasattr(os, 'chmod'): os.chmod(self._filename, perm) Index: Lib/test/test_fileinput.py =================================================================== --- Lib/test/test_fileinput.py (revision 52964) +++ Lib/test/test_fileinput.py (working copy) @@ -3,8 +3,8 @@ Nick Mathewson ''' -from test.test_support import verify, verbose, TESTFN, TestFailed -import sys, os, re +from test.test_support import verify, verbose, TESTFN, TestFailed, run_unittest +import sys, os, re, unittest from StringIO import StringIO from fileinput import FileInput, hook_encoded @@ -15,6 +15,7 @@ # Write lines (a list of lines) to temp file number i, and return the # temp file's name. + def writeTmp(i, lines, mode='w'): # opening in text mode is the default name = TESTFN + str(i) f = open(name, mode) @@ -22,7 +23,6 @@ f.close() return name -pat = re.compile(r'LINE (\d+) OF FILE (\d+)') def remove_tempfiles(*names): for name in names: @@ -31,197 +31,281 @@ except: pass -def runTests(t1, t2, t3, t4, bs=0, round=0): - start = 1 + round*6 - if verbose: - print '%s. Simple iteration (bs=%s)' % (start+0, bs) - fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) - lines = list(fi) - fi.close() - verify(len(lines) == 31) - verify(lines[4] == 'Line 5 of file 1\n') - verify(lines[30] == 'Line 1 of file 4\n') - verify(fi.lineno() == 31) - verify(fi.filename() == t4) +def writeFiles(): + t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) + t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) + t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) + t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) - if verbose: - print '%s. Status variables (bs=%s)' % (start+1, bs) - fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) - s = "x" - while s and s != 'Line 6 of file 2\n': - s = fi.readline() - verify(fi.filename() == t2) - verify(fi.lineno() == 21) - verify(fi.filelineno() == 6) - verify(not fi.isfirstline()) - verify(not fi.isstdin()) + return (t1, t2, t3, t4) - if verbose: - print '%s. Nextfile (bs=%s)' % (start+2, bs) - fi.nextfile() - verify(fi.readline() == 'Line 1 of file 3\n') - verify(fi.lineno() == 22) - fi.close() +class FileInputTests(unittest.TestCase): + + def testInplaceBinaryMode(self): + '''Check that the binary write mode works correctly. + Write LF characters, and ensure that they are not translated to + CRLF or LF (irrelevant test on unix type systems, since binary and + text mode are the same.''' + test_files = () + try: + test_files = writeFiles() + fi = FileInput(test_files, inplace=True, write_mode="wb") + + for line in fi: + print line.strip(), "\n" - if verbose: - print '%s. Stdin (bs=%s)' % (start+3, bs) - fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs) - savestdin = sys.stdin - try: - sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") + fi.close() + + fi = FileInput(test_files) + + for line in fi: + self.assert_('\r' not in line) + fi.close() + finally: + remove_tempfiles(*test_files) + + def testZeroLengthFiles(self): + '''Check for proper behavior with 0-byte files.''' + try: + t1 = writeTmp(1, [""]) + t2 = writeTmp(2, [""]) + t3 = writeTmp(3, ["The only line there is.\n"]) + t4 = writeTmp(4, [""]) + fi = FileInput(files=(t1, t2, t3, t4)) + line = fi.readline() + self.assert_(line == 'The only line there is.\n') + self.assert_(fi.lineno() == 1) + self.assert_(fi.filelineno() == 1) + self.assert_(fi.filename() == t3) + line = fi.readline() + self.assert_(not line) + self.assert_(fi.lineno() == 1) + self.assert_(fi.filelineno() == 0) + self.assert_(fi.filename() == t4) + fi.close() + finally: + remove_tempfiles(t1, t2, t3, t4) + + def testNoNewlinesAtEnd(self): + "Files that don't end with newline" + try: + t1 = writeTmp(1, ["A\nB\nC"]) + t2 = writeTmp(2, ["D\nE\nF"]) + fi = FileInput(files=(t1, t2)) + lines = list(fi) + self.assert_(lines == ["A\n", "B\n", "C", "D\n", "E\n", "F"]) + self.assert_(fi.filelineno() == 3) + self.assert_(fi.lineno() == 6) + finally: + remove_tempfiles(t1, t2) + + def testUnicodeFilenames(self): + "Unicode filenames" + try: + t1 = writeTmp(1, ["A\nB"]) + encoding = sys.getfilesystemencoding() + if encoding is None: + encoding = 'ascii' + fi = FileInput(files=unicode(t1, encoding)) + lines = list(fi) + self.assert_(lines == ["A\n", "B"]) + finally: + remove_tempfiles(t1) + + def testFileno(self): + "Test correct fileno() operation." + try: + t1 = writeTmp(1, ["A\nB"]) + t2 = writeTmp(2, ["C\nD"]) + fi = FileInput(files=(t1, t2)) + self.assert_(fi.fileno() == -1) + line = fi.next() + self.assert_(fi.fileno() != -1) + fi.nextfile() + self.assert_(fi.fileno() == -1) + line = list(fi) + self.assert_(fi.fileno() == -1) + finally: + remove_tempfiles(t1, t2) + + def testFileOpeningHook(self): + "Test file opening hook" + try: + # cannot use openhook and inplace mode + fi = FileInput(inplace=1, openhook=lambda f,m: None) + self.fail("FileInput should raise if both inplace " + "and openhook arguments are given") + except ValueError: + pass + try: + fi = FileInput(openhook=1) + self.fail("FileInput should check openhook for being callable") + except ValueError: + pass + try: + t1 = writeTmp(1, ["A\nB"], mode="wb") + fi = FileInput(files=t1, openhook=hook_encoded("rot13")) + lines = list(fi) + self.assert_(lines == ["N\n", "O"]) + finally: + remove_tempfiles(t1) + +class DefaultBufferSizeTests(unittest.TestCase): + def setUp(self): + self.test_files = writeFiles() + self.bs = 0 + + def tearDown(self): + remove_tempfiles(*self.test_files) + + def testSimpleIteration(self): + "Simple iteration (bs=0)" + + fi = FileInput(files=self.test_files, bufsize=self.bs) lines = list(fi) - verify(len(lines) == 33) - verify(lines[32] == 'Line 2 of stdin\n') - verify(fi.filename() == '') + fi.close() + self.assert_(len(lines) == 31) + self.assert_(lines[4] == 'Line 5 of file 1\n') + self.assert_(lines[30] == 'Line 1 of file 4\n') + self.assert_(fi.lineno() == 31) + self.assert_(fi.filename() == self.test_files[3]) + + def testStatusVariables(self): + 'Status variables (bs=0)' + + fi = FileInput(files=self.test_files, bufsize=self.bs) + s = "x" + while s and s != 'Line 6 of file 2\n': + s = fi.readline() + self.assert_(fi.filename() == self.test_files[1]) + self.assert_(fi.lineno() == 21) + self.assert_(fi.filelineno() == 6) + self.assert_(not fi.isfirstline()) + self.assert_(not fi.isstdin()) + fi.close() + + def testNextFile(self): + 'Nextfile (bs=0)' + + fi = FileInput(files=self.test_files, bufsize=self.bs) + + for i in range(1,4): + fi.nextfile() # Does nothing on the first iteration, as no lines have been read. + line = fi.readline() + self.assert_(line == "Line 1 of file %d\n" % i) + + # Three lines have been read - lines skipped do not count toward the cumulative total. + self.assert_(fi.lineno() == 3) + fi.close() + + def testStdIn(self): + 'Stdin (bs=0)' + file_list = list(self.test_files) + file_list.append('-') + + fi = FileInput(files=file_list, bufsize=self.bs) + savestdin = sys.stdin + try: + sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") + lines = list(fi) + self.assert_(len(lines) == 33) + self.assert_(lines[32] == 'Line 2 of stdin\n') + self.assert_(fi.filename() == '') + fi.nextfile() + finally: + sys.stdin = savestdin + fi.close() + + def testBoundaryConditions(self): + 'Boundary conditions (bs=0)' + + fi = FileInput(files=self.test_files, bufsize=self.bs) + self.assert_(fi.lineno() == 0) + self.assert_(fi.filename() == None) fi.nextfile() - finally: - sys.stdin = savestdin + self.assert_(fi.lineno() == 0) + self.assert_(fi.filename() == None) + fi.close() - if verbose: - print '%s. Boundary conditions (bs=%s)' % (start+4, bs) - fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) - verify(fi.lineno() == 0) - verify(fi.filename() == None) - fi.nextfile() - verify(fi.lineno() == 0) - verify(fi.filename() == None) + def testInplace(self): + 'Inplace (bs=0)' - if verbose: - print '%s. Inplace (bs=%s)' % (start+5, bs) - savestdout = sys.stdout - try: - fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs) + savestdout = sys.stdout + try: + fi = FileInput(files=self.test_files, inplace=1, bufsize=self.bs) + for line in fi: + line = line[:-1].upper() + print line + fi.close() + finally: + sys.stdout = savestdout + + pat = re.compile(r'LINE (\d+) OF FILE (\d+)') + + fi = FileInput(files=self.test_files, bufsize=self.bs) for line in fi: - line = line[:-1].upper() - print line + self.assert_(line[-1] == '\n') + m = pat.match(line[:-1]) + self.assert_(m != None) + self.assert_(int(m.group(1)) == fi.filelineno()) fi.close() - finally: - sys.stdout = savestdout - fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) - for line in fi: - verify(line[-1] == '\n') - m = pat.match(line[:-1]) - verify(m != None) - verify(int(m.group(1)) == fi.filelineno()) - fi.close() + def testOpeningMode(self): + "Specify opening mode (bs=0)" + try: + # invalid mode, should raise ValueError + fi = FileInput(mode="w") + self.fail("FileInput should reject invalid mode argument") + except ValueError: + pass + try: + # try opening in universal newline mode + t1 = writeTmp(1, ["A\nB\r\nC\rD"], mode="wb") + fi = FileInput(files=t1, mode="U") + lines = list(fi) + self.assert_(lines == ["A\n", "B\n", "C\n", "D"]) + finally: + remove_tempfiles(t1) -def writeFiles(): - global t1, t2, t3, t4 - t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) - t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) - t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) - t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) +class SmallBufferSizeTests(DefaultBufferSizeTests): + def setUp(self): + self.test_files = writeFiles() + self.bs = 30 -# First, run the tests with default and teeny buffer size. -for round, bs in (0, 0), (1, 30): - try: - writeFiles() - runTests(t1, t2, t3, t4, bs, round) - finally: - remove_tempfiles(t1, t2, t3, t4) + def testSimpleIteration(self): + "Simple iteration (bs=30)" + DefaultBufferSizeTests.testSimpleIteration(self) -# Next, check for proper behavior with 0-byte files. -if verbose: - print "13. 0-byte files" -try: - t1 = writeTmp(1, [""]) - t2 = writeTmp(2, [""]) - t3 = writeTmp(3, ["The only line there is.\n"]) - t4 = writeTmp(4, [""]) - fi = FileInput(files=(t1, t2, t3, t4)) - line = fi.readline() - verify(line == 'The only line there is.\n') - verify(fi.lineno() == 1) - verify(fi.filelineno() == 1) - verify(fi.filename() == t3) - line = fi.readline() - verify(not line) - verify(fi.lineno() == 1) - verify(fi.filelineno() == 0) - verify(fi.filename() == t4) - fi.close() -finally: - remove_tempfiles(t1, t2, t3, t4) + def testStatusVariables(self): + 'Status variables (bs=30)' + DefaultBufferSizeTests.testStatusVariables(self) -if verbose: - print "14. Files that don't end with newline" -try: - t1 = writeTmp(1, ["A\nB\nC"]) - t2 = writeTmp(2, ["D\nE\nF"]) - fi = FileInput(files=(t1, t2)) - lines = list(fi) - verify(lines == ["A\n", "B\n", "C", "D\n", "E\n", "F"]) - verify(fi.filelineno() == 3) - verify(fi.lineno() == 6) -finally: - remove_tempfiles(t1, t2) + def testNextFile(self): + 'Nextfile (bs=30)' + DefaultBufferSizeTests.testNextFile(self) -if verbose: - print "15. Unicode filenames" -try: - t1 = writeTmp(1, ["A\nB"]) - encoding = sys.getfilesystemencoding() - if encoding is None: - encoding = 'ascii' - fi = FileInput(files=unicode(t1, encoding)) - lines = list(fi) - verify(lines == ["A\n", "B"]) -finally: - remove_tempfiles(t1) + def testStdIn(self): + 'Stdin (bs=30)' + DefaultBufferSizeTests.testStdIn(self) -if verbose: - print "16. fileno()" -try: - t1 = writeTmp(1, ["A\nB"]) - t2 = writeTmp(2, ["C\nD"]) - fi = FileInput(files=(t1, t2)) - verify(fi.fileno() == -1) - line = fi.next() - verify(fi.fileno() != -1) - fi.nextfile() - verify(fi.fileno() == -1) - line = list(fi) - verify(fi.fileno() == -1) -finally: - remove_tempfiles(t1, t2) + def testBoundaryConditions(self): + 'Boundary conditions (bs=30)' + DefaultBufferSizeTests.testBoundaryConditions(self) -if verbose: - print "17. Specify opening mode" -try: - # invalid mode, should raise ValueError - fi = FileInput(mode="w") - raise TestFailed("FileInput should reject invalid mode argument") -except ValueError: - pass -try: - # try opening in universal newline mode - t1 = writeTmp(1, ["A\nB\r\nC\rD"], mode="wb") - fi = FileInput(files=t1, mode="U") - lines = list(fi) - verify(lines == ["A\n", "B\n", "C\n", "D"]) -finally: - remove_tempfiles(t1) + def testInplace(self): + 'Inplace (bs=30)' + DefaultBufferSizeTests.testInplace(self) -if verbose: - print "18. Test file opening hook" -try: - # cannot use openhook and inplace mode - fi = FileInput(inplace=1, openhook=lambda f,m: None) - raise TestFailed("FileInput should raise if both inplace " - "and openhook arguments are given") -except ValueError: - pass -try: - fi = FileInput(openhook=1) - raise TestFailed("FileInput should check openhook for being callable") -except ValueError: - pass -try: - t1 = writeTmp(1, ["A\nB"], mode="wb") - fi = FileInput(files=t1, openhook=hook_encoded("rot13")) - lines = list(fi) - verify(lines == ["N\n", "O"]) -finally: - remove_tempfiles(t1) + def testOpeningMode(self): + "Specify opening mode (bs=30)" + DefaultBufferSizeTests.testOpeningMode(self) + +def test_main(): + print "Running Unit Tests" + run_unittest(FileInputTests, DefaultBufferSizeTests, SmallBufferSizeTests) + + +if __name__ == "__main__": + test_main() +