diff -r 19f36a2a34ec Lib/_pyio.py --- a/Lib/_pyio.py Mon Mar 23 01:09:35 2015 +0200 +++ b/Lib/_pyio.py Mon Mar 23 14:30:28 2015 +0200 @@ -7,11 +7,16 @@ import abc import codecs import errno import array +import stat # Import _thread instead of threading to reduce startup cost try: from _thread import allocate_lock as Lock except ImportError: from _dummy_thread import allocate_lock as Lock +if os.name == 'win32': + from msvcrt import setmode as _setmode +else: + _setmode = None import io from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END) @@ -1376,6 +1381,346 @@ class BufferedRandom(BufferedWriter, Buf return BufferedWriter.write(self, b) +class FileIO(RawIOBase): + _fd = -1 + _created = False + _readable = False + _writable = False + _appending = False + _seekable = None + _closefd = True + + def __init__(self, file, mode='r', closefd=True, opener=None): + """Open a file. The mode can be 'r', 'w', 'x' or 'a' for reading (default), + writing, exclusive creation or appending. The file will be created if it + doesn't exist when opened for writing or appending; it will be truncated + when opened for writing. A `FileExistsError` will be raised if it already + exists when opened for creating. Opening a file for creating implies + writing so this mode behaves in a similar way to 'w'. Add a '+' to the mode + to allow simultaneous reading and writing. A custom opener can be used by + passing a callable as *opener*. The underlying file descriptor for the file + object is then obtained by calling opener with (*name*, *flags*). + *opener* must return an open file descriptor (passing os.open as *opener* + results in functionality similar to passing None).""" + if self._fd >= 0: + # Have to close the existing file first. + try: + if self._closefd: + os.close(self._fd) + finally: + self._fd = -1 + + if isinstance(file, float): + raise TypeError('integer argument expected, got float') + elif isinstance(file, int): + fd = file + if fd < 0: + raise ValueError('Negative filedescriptor') + else: + fd = -1 + + if not isinstance(mode, str): + raise TypeError('invalid mode: %s' % (mode,)) + if not set(mode) <= set('xrwab+'): + raise ValueError('invalid mode: %s' % (mode,)) + if sum(c in 'rwax' for c in mode) != 1 or mode.count('+') > 1: + raise ValueError('Must have exactly one of create/read/write/append ' + 'mode and at most one plus') + + if 'x' in mode: + self._created = True + self._writable = True + flags = os.O_EXCL | os.O_CREAT + elif 'r' in mode: + self._readable = True + flags = 0 + elif 'w' in mode: + self._writable = True + flags = os.O_CREAT | os.O_TRUNC + elif 'a' in mode: + self._writable = True + self._appending = True + flags = os.O_APPEND | os.O_CREAT + + if '+' in mode: + self._readable = True + self._writable = True + + if self._readable and self._writable: + flags |= os.O_RDWR + elif self._readable: + flags |= os.O_RDONLY + else: + flags |= os.O_WRONLY + + binary_flag = getattr(os, 'O_BINARY', 0) + flags |= binary_flag + + noinherit_flag = (getattr(os, 'O_NOINHERIT', 0) or + getattr(os, 'O_CLOEXEC', 0)) + flags |= noinherit_flag + + owned_fd = None + try: + if fd >= 0: + # Check if the fd is valid. + try: + os.fstat(fd) + except OSError as e: + if e.errno == errno.EBADF: + raise + self._closefd = closefd + else: + self._closefd = True + if not closefd: + raise ValueError('Cannot use closefd=False with file name') + if opener is None: + fd = os.open(file, flags, 0o666) + else: + fd = opener(file, flags) + if not isinstance(fd, int): + raise TypeError('expected integer from opener') + if fd < 0: + raise OSError('Negative file descriptor') + owned_fd = fd + if not noinherit_flag: + os.set_inheritable(fd, False) + + self._blksize = DEFAULT_BUFFER_SIZE + fdfstat = os.fstat(fd) + try: + if stat.S_ISDIR(fdfstat.st_mode): + raise IsADirectoryError(errno.EISDIR, + os.strerror(errno.EISDIR), file) + except AttributeError: + # Ignore the AttribueError if stat.S_ISDIR or errno.EISDIR + # don't exist. + pass + blksize = getattr(fdfstat, 'st_blksize', 0) + if blksize > 1: + self._blksize = blksize + + if _setmode: + # don't translate newlines (\r\n <=> \n) + _setmode(fd, os.O_BINARY) + + self.name = file + if self._appending: + # For consistent behaviour, we explicitly seek to the + # end of file (otherwise, it might be done only on the + # first write()). + os.lseek(fd, 0, SEEK_END) + except: + if owned_fd is not None: + os.close(owned_fd) + raise + self._fd = fd + + def __del__(self): + if self._fd >= 0 and self._closefd and not self.closed: + import warnings + warnings.warn('unclosed file %r' % (self,), ResourceWarning, + stacklevel=2) + self.close() + + def __getstate__(self): + raise TypeError("cannot serialize '%s' object", self.__class__.__name__) + + def __repr__(self): + class_name = '%s.%s' % (self.__class__.__module__, + self.__class__.__qualname__) + if self.closed: + return '<%s [closed]>' % class_name + try: + name = self.name + except AttributeError: + return ('<%s fd=%d mode=%r closefd=%r>' % + (class_name, self._fd, self.mode, self._closefd)) + else: + return ('<%s name=%r mode=%r closefd=%r>' % + (class_name, name, self.mode, self._closefd)) + + def _checkReadable(self): + if not self._readable: + raise UnsupportedOperation('File not open for reading') + + def _checkWritable(self, msg=None): + if not self._writable: + raise UnsupportedOperation('File not open for writing') + + def read(self, size=None): + """Read at most size bytes, returned as bytes. + + Only makes one system call, so less data may be returned than requested + In non-blocking mode, returns None if no data is available. + On end-of-file, returns ''.""" + self._checkClosed() + self._checkReadable() + if size is None or size < 0: + return self.readall() + try: + return os.read(self._fd, size) + except BlockingIOError: + return None + + def readall(self): + """Read all data from the file, returned as bytes. + + In non-blocking mode, returns as much as is immediately available, + or None if no data is available. On end-of-file, returns ''.""" + self._checkClosed() + self._checkReadable() + bufsize = DEFAULT_BUFFER_SIZE + try: + pos = os.lseek(self._fd, 0, SEEK_CUR) + end = os.fstat(self._fd).st_size + if end >= pos: + bufsize = end - pos + 1 + except OSError: + pass + + result = bytearray() + while True: + if len(result) >= bufsize: + bufsize = len(result) + bufsize += max(bufsize, DEFAULT_BUFFER_SIZE) + n = bufsize - len(result) + try: + b = os.read(self._fd, n) + except BlockingIOError: + if result: + break + return None + if not b: # reached the end of the file + break + result += b + + return bytes(result) + + def readinto(self, b): + "Same as RawIOBase.readinto()." + m = memoryview(b).cast('B') + data = self.read(len(m)) + n = len(data) + m[:n] = data + return n + + def write(self, b): + """Write bytes b to file, return number written. + + Only makes one system call, so not all of the data may be written. + The number of bytes actually written is returned.""" + self._checkClosed() + self._checkWritable() + try: + return os.write(self._fd, b) + except BlockingIOError: + return None + + def seek(self, pos, whence=SEEK_SET): + """Move to new file position. + + Argument offset is a byte count. Optional argument whence defaults to + 0 (offset from start of file, offset should be >= 0); other values are 1 + (move relative to current position, positive or negative), and 2 (move + relative to end of file, usually negative, although many platforms allow + seeking beyond the end of a file). + Note that not all file objects are seekable.""" + if isinstance(pos, float): + raise TypeError('an integer is required') + self._checkClosed() + return os.lseek(self._fd, pos, whence) + + def tell(self): + "tell() -> int. Current file position" + self._checkClosed() + return os.lseek(self._fd, 0, SEEK_CUR) + + def truncate(self, size=None): + """Truncate the file to at most size bytes. + + Size defaults to the current file position, as returned by tell(). + The current file position is changed to the value of size.""" + self._checkClosed() + self._checkWritable() + if size is None: + size = self.tell() + os.ftruncate(self._fd, size) + return size + + def close(self): + """Close the file. + + A closed file cannot be used for further I/O operations. close() may be + called more than once without error. Changes the fileno to -1.""" + if not self.closed: + try: + if self._closefd: + os.close(self._fd) + finally: + super().close() + + def seekable(self): + "True if file supports random-access." + self._checkClosed() + if self._seekable is None: + try: + self.tell() + except OSError: + self._seekable = False + else: + self._seekable = True + return self._seekable + + def readable(self): + "True if file was opened in a read mode." + self._checkClosed() + return self._readable + + def writable(self): + "writable() -> bool. True if file was opened in a write mode." + self._checkClosed() + return self._writable + + def fileno(self): + """"File descriptor". + + This is needed for lower-level file interfaces, such the fcntl module.""" + self._checkClosed() + return self._fd + + def isatty(self): + "True if the file is connected to a tty device." + self._checkClosed() + return os.isatty(self._fd) + + @property + def closefd(self): + "True if the file descriptor will be closed" + return self._closefd + + @property + def mode(self): + "String giving the file mode" + if self._created: + if self._readable: + return 'xb+' + else: + return 'xb' + elif self._appending: + if self._readable: + return 'ab+' + else: + return 'ab' + elif self._readable: + if self._writable: + return 'rb+' + else: + return 'rb' + else: + return 'wb' + + class TextIOBase(IOBase): """Base class for text I/O. diff -r 19f36a2a34ec Lib/test/test_file_eintr.py --- a/Lib/test/test_file_eintr.py Mon Mar 23 01:09:35 2015 +0200 +++ b/Lib/test/test_file_eintr.py Mon Mar 23 14:30:28 2015 +0200 @@ -18,11 +18,12 @@ import time import unittest # Test import all of the things we're about to try testing up front. -from _io import FileIO +import _io +import _pyio @unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.') -class TestFileIOSignalInterrupt(unittest.TestCase): +class TestFileIOSignalInterrupt: def setUp(self): self._process = None @@ -38,8 +39,9 @@ class TestFileIOSignalInterrupt(unittest subclasseses should override this to test different IO objects. """ - return ('import _io ;' - 'infile = _io.FileIO(sys.stdin.fileno(), "rb")') + return ('import %s as io ;' + 'infile = io.FileIO(sys.stdin.fileno(), "rb")' % + self.modname) def fail_with_process_info(self, why, stdout=b'', stderr=b'', communicate=True): @@ -179,11 +181,19 @@ class TestFileIOSignalInterrupt(unittest expected=b'hello\nworld!\n')) +class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase): + modname = '_io' + +class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase): + modname = '_pyio' + + class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt): def _generate_infile_setup_code(self): """Returns the infile = ... line of code to make a BufferedReader.""" - return ('infile = open(sys.stdin.fileno(), "rb") ;' - 'import _io ;assert isinstance(infile, _io.BufferedReader)') + return ('import %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;' + 'assert isinstance(infile, io.BufferedReader)' % + self.modname) def test_readall(self): """BufferedReader.read() must handle signals and not lose data.""" @@ -193,12 +203,20 @@ class TestBufferedIOSignalInterrupt(Test read_method_name='read', expected=b'hello\nworld!\n')) +class CTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase): + modname = '_io' + +class PyTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase): + modname = '_pyio' + class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt): def _generate_infile_setup_code(self): """Returns the infile = ... line of code to make a TextIOWrapper.""" - return ('infile = open(sys.stdin.fileno(), "rt", newline=None) ;' - 'import _io ;assert isinstance(infile, _io.TextIOWrapper)') + return ('import %s as io ;' + 'infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;' + 'assert isinstance(infile, io.TextIOWrapper)' % + self.modname) def test_readline(self): """readline() must handle signals and not lose data.""" @@ -224,6 +242,12 @@ class TestTextIOSignalInterrupt(TestFile read_method_name='read', expected="hello\nworld!\n")) +class CTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase): + modname = '_io' + +class PyTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase): + modname = '_pyio' + def test_main(): test_cases = [ diff -r 19f36a2a34ec Lib/test/test_fileio.py --- a/Lib/test/test_fileio.py Mon Mar 23 01:09:35 2015 +0200 +++ b/Lib/test/test_fileio.py Mon Mar 23 14:30:28 2015 +0200 @@ -12,13 +12,15 @@ from functools import wraps from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd, cpython_only from collections import UserList -from _io import FileIO as _FileIO +import _io # C implementation of io +import _pyio # Python implementation of io -class AutoFileTests(unittest.TestCase): + +class AutoFileTests: # file tests for which a test file is automatically set up def setUp(self): - self.f = _FileIO(TESTFN, 'w') + self.f = self.FileIO(TESTFN, 'w') def tearDown(self): if self.f: @@ -69,20 +71,60 @@ class AutoFileTests(unittest.TestCase): blksize = getattr(fst, 'st_blksize', blksize) self.assertEqual(self.f._blksize, blksize) - def testReadinto(self): - # verify readinto - self.f.write(bytes([1, 2])) + # verify readinto + def testReadintoByteArray(self): + self.f.write(bytes([1, 2, 0, 255])) self.f.close() - a = array('b', b'x'*10) - self.f = _FileIO(TESTFN, 'r') - n = self.f.readinto(a) - self.assertEqual(array('b', [1, 2]), a[:n]) + + ba = bytearray(b'abcdefgh') + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(ba) + self.assertEqual(ba, b'\x01\x02\x00\xffefgh') + self.assertEqual(n, 4) + + def _testReadintoMemoryview(self): + self.f.write(bytes([1, 2, 0, 255])) + self.f.close() + + m = memoryview(bytearray(b'abcdefgh')) + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(m) + self.assertEqual(m, b'\x01\x02\x00\xffefgh') + self.assertEqual(n, 4) + + m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(m) + self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') + self.assertEqual(n, 4) + + def _testReadintoArray(self): + self.f.write(bytes([1, 2, 0, 255])) + self.f.close() + + a = array('B', b'abcdefgh') + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(a) + self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) + self.assertEqual(n, 4) + + a = array('b', b'abcdefgh') + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(a) + self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) + self.assertEqual(n, 4) + + a = array('I', b'abcdefgh') + with self.FileIO(TESTFN, 'r') as f: + n = f.readinto(a) + self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) + self.assertEqual(n, 4) def testWritelinesList(self): l = [b'123', b'456'] self.f.writelines(l) self.f.close() - self.f = _FileIO(TESTFN, 'rb') + self.f = self.FileIO(TESTFN, 'rb') buf = self.f.read() self.assertEqual(buf, b'123456') @@ -90,7 +132,7 @@ class AutoFileTests(unittest.TestCase): l = UserList([b'123', b'456']) self.f.writelines(l) self.f.close() - self.f = _FileIO(TESTFN, 'rb') + self.f = self.FileIO(TESTFN, 'rb') buf = self.f.read() self.assertEqual(buf, b'123456') @@ -102,7 +144,7 @@ class AutoFileTests(unittest.TestCase): def test_none_args(self): self.f.write(b"hi\nbye\nabc") self.f.close() - self.f = _FileIO(TESTFN, 'r') + self.f = self.FileIO(TESTFN, 'r') self.assertEqual(self.f.read(None), b"hi\nbye\nabc") self.f.seek(0) self.assertEqual(self.f.readline(None), b"hi\n") @@ -112,23 +154,24 @@ class AutoFileTests(unittest.TestCase): self.assertRaises(TypeError, self.f.write, "Hello!") def testRepr(self): - self.assertEqual( - repr(self.f), "<_io.FileIO name=%r mode=%r closefd=True>" - % (self.f.name, self.f.mode)) + self.assertEqual(repr(self.f), + "<%s.FileIO name=%r mode=%r closefd=True>" % + (self.modulename, self.f.name, self.f.mode)) del self.f.name - self.assertEqual( - repr(self.f), "<_io.FileIO fd=%r mode=%r closefd=True>" - % (self.f.fileno(), self.f.mode)) + self.assertEqual(repr(self.f), + "<%s.FileIO fd=%r mode=%r closefd=True>" % + (self.modulename, self.f.fileno(), self.f.mode)) self.f.close() - self.assertEqual(repr(self.f), "<_io.FileIO [closed]>") + self.assertEqual(repr(self.f), + "<%s.FileIO [closed]>" % (self.modulename,)) def testReprNoCloseFD(self): fd = os.open(TESTFN, os.O_RDONLY) try: - with _FileIO(fd, 'r', closefd=False) as f: + with self.FileIO(fd, 'r', closefd=False) as f: self.assertEqual(repr(f), - "<_io.FileIO name=%r mode=%r closefd=False>" - % (f.name, f.mode)) + "<%s.FileIO name=%r mode=%r closefd=False>" % + (self.modulename, f.name, f.mode)) finally: os.close(fd) @@ -140,15 +183,15 @@ class AutoFileTests(unittest.TestCase): self.assertRaises(ValueError, f.read, 10) # Open for reading f.close() self.assertTrue(f.closed) - f = _FileIO(TESTFN, 'r') + f = self.FileIO(TESTFN, 'r') self.assertRaises(TypeError, f.readinto, "") self.assertTrue(not f.closed) f.close() self.assertTrue(f.closed) def testMethods(self): - methods = ['fileno', 'isatty', 'read', 'readinto', - 'seek', 'tell', 'truncate', 'write', 'seekable', + methods = ['fileno', 'isatty', 'read', + 'tell', 'truncate', 'seekable', 'readable', 'writable'] self.f.close() @@ -158,13 +201,16 @@ class AutoFileTests(unittest.TestCase): method = getattr(self.f, methodname) # should raise on closed file self.assertRaises(ValueError, method) + self.assertRaises(ValueError, self.f.readinto, bytearray()) + self.assertRaises(ValueError, self.f.seek, 0, os.SEEK_CUR) + self.assertRaises(ValueError, self.f.write, b'') def testOpendir(self): # Issue 3703: opening a directory should fill the errno # Windows always returns "[Errno 13]: Permission denied # Unix uses fstat and returns "[Errno 21]: Is a directory" try: - _FileIO('.', 'r') + self.FileIO('.', 'r') except OSError as e: self.assertNotEqual(e.errno, 0) self.assertEqual(e.filename, ".") @@ -175,7 +221,7 @@ class AutoFileTests(unittest.TestCase): def testOpenDirFD(self): fd = os.open('.', os.O_RDONLY) with self.assertRaises(OSError) as cm: - _FileIO(fd, 'r') + self.FileIO(fd, 'r') os.close(fd) self.assertEqual(cm.exception.errno, errno.EISDIR) @@ -260,7 +306,7 @@ class AutoFileTests(unittest.TestCase): self.f.close() except OSError: pass - self.f = _FileIO(TESTFN, 'r') + self.f = self.FileIO(TESTFN, 'r') os.close(self.f.fileno()) return self.f @@ -280,23 +326,32 @@ class AutoFileTests(unittest.TestCase): a = array('b', b'x'*10) f.readinto(a) -class OtherFileTests(unittest.TestCase): +class CAutoFileTests(AutoFileTests, unittest.TestCase): + FileIO = _io.FileIO + modulename = '_io' + +class PyAutoFileTests(AutoFileTests, unittest.TestCase): + FileIO = _pyio.FileIO + modulename = '_pyio' + + +class OtherFileTests: def testAbles(self): try: - f = _FileIO(TESTFN, "w") + f = self.FileIO(TESTFN, "w") self.assertEqual(f.readable(), False) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) f.close() - f = _FileIO(TESTFN, "r") + f = self.FileIO(TESTFN, "r") self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), False) self.assertEqual(f.seekable(), True) f.close() - f = _FileIO(TESTFN, "a+") + f = self.FileIO(TESTFN, "a+") self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) @@ -305,7 +360,7 @@ class OtherFileTests(unittest.TestCase): if sys.platform != "win32": try: - f = _FileIO("/dev/tty", "a") + f = self.FileIO("/dev/tty", "a") except OSError: # When run in a cron job there just aren't any # ttys, so skip the test. This also handles other @@ -328,7 +383,7 @@ class OtherFileTests(unittest.TestCase): # check invalid mode strings for mode in ("", "aU", "wU+", "rw", "rt"): try: - f = _FileIO(TESTFN, mode) + f = self.FileIO(TESTFN, mode) except ValueError: pass else: @@ -344,7 +399,7 @@ class OtherFileTests(unittest.TestCase): ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: # read modes are last so that TESTFN will exist first - with _FileIO(TESTFN, modes[0]) as f: + with self.FileIO(TESTFN, modes[0]) as f: self.assertEqual(f.mode, modes[1]) finally: if os.path.exists(TESTFN): @@ -352,7 +407,7 @@ class OtherFileTests(unittest.TestCase): def testUnicodeOpen(self): # verify repr works for unicode too - f = _FileIO(str(TESTFN), "w") + f = self.FileIO(str(TESTFN), "w") f.close() os.unlink(TESTFN) @@ -362,7 +417,7 @@ class OtherFileTests(unittest.TestCase): fn = TESTFN.encode("ascii") except UnicodeEncodeError: self.skipTest('could not encode %r to ascii' % TESTFN) - f = _FileIO(fn, "w") + f = self.FileIO(fn, "w") try: f.write(b"abc") f.close() @@ -373,28 +428,21 @@ class OtherFileTests(unittest.TestCase): def testConstructorHandlesNULChars(self): fn_with_NUL = 'foo\0bar' - self.assertRaises(ValueError, _FileIO, fn_with_NUL, 'w') - self.assertRaises(ValueError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w') + self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') + self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') def testInvalidFd(self): - self.assertRaises(ValueError, _FileIO, -10) - self.assertRaises(OSError, _FileIO, make_bad_fd()) + self.assertRaises(ValueError, self.FileIO, -10) + self.assertRaises(OSError, self.FileIO, make_bad_fd()) if sys.platform == 'win32': import msvcrt self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) - @cpython_only - def testInvalidFd_overflow(self): - # Issue 15989 - import _testcapi - self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1) - self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1) - def testBadModeArgument(self): # verify that we get a sensible error message for bad mode argument bad_mode = "qwerty" try: - f = _FileIO(TESTFN, bad_mode) + f = self.FileIO(TESTFN, bad_mode) except ValueError as msg: if msg.args[0] != 0: s = str(msg) @@ -407,7 +455,7 @@ class OtherFileTests(unittest.TestCase): self.fail("no error for invalid mode: %s" % bad_mode) def testTruncate(self): - f = _FileIO(TESTFN, 'w') + f = self.FileIO(TESTFN, 'w') f.write(bytes(bytearray(range(10)))) self.assertEqual(f.tell(), 10) f.truncate(5) @@ -422,11 +470,11 @@ class OtherFileTests(unittest.TestCase): def bug801631(): # SF bug # "file.truncate fault on windows" - f = _FileIO(TESTFN, 'w') + f = self.FileIO(TESTFN, 'w') f.write(bytes(range(11))) f.close() - f = _FileIO(TESTFN,'r+') + f = self.FileIO(TESTFN,'r+') data = f.read(5) if data != bytes(range(5)): self.fail("Read on file opened for update failed %r" % data) @@ -466,19 +514,19 @@ class OtherFileTests(unittest.TestCase): pass def testInvalidInit(self): - self.assertRaises(TypeError, _FileIO, "1", 0, 0) + self.assertRaises(TypeError, self.FileIO, "1", 0, 0) def testWarnings(self): with check_warnings(quiet=True) as w: self.assertEqual(w.warnings, []) - self.assertRaises(TypeError, _FileIO, []) + self.assertRaises(TypeError, self.FileIO, []) self.assertEqual(w.warnings, []) - self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") + self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") self.assertEqual(w.warnings, []) def testUnclosedFDOnException(self): class MyException(Exception): pass - class MyFileIO(_FileIO): + class MyFileIO(self.FileIO): def __setattr__(self, name, value): if name == "name": raise MyException("blocked setting name") @@ -487,12 +535,28 @@ class OtherFileTests(unittest.TestCase): self.assertRaises(MyException, MyFileIO, fd) os.close(fd) # should not raise OSError(EBADF) +class COtherFileTests(OtherFileTests, unittest.TestCase): + FileIO = _io.FileIO + modulename = '_io' + + @cpython_only + def testInvalidFd_overflow(self): + # Issue 15989 + import _testcapi + self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) + self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) + +class PyOtherFileTests(OtherFileTests, unittest.TestCase): + FileIO = _pyio.FileIO + modulename = '_pyio' + def test_main(): # Historically, these tests have been sloppy about removing TESTFN. # So get rid of it no matter what. try: - run_unittest(AutoFileTests, OtherFileTests) + run_unittest(CAutoFileTests, PyAutoFileTests, + COtherFileTests, PyOtherFileTests) finally: if os.path.exists(TESTFN): os.unlink(TESTFN)