diff --git a/Doc/library/codecs.rst b/Doc/library/codecs.rst --- a/Doc/library/codecs.rst +++ b/Doc/library/codecs.rst @@ -85,6 +85,9 @@ It defines the following functions: In case a search function cannot find a given encoding, it should return ``None``. + .. deprecated:: 3.3 + *streamreader* and *streamwriter* are now deprecated. + .. function:: lookup(encoding) @@ -139,6 +142,8 @@ functions which use :func:`lookup` for t Raises a :exc:`LookupError` in case the encoding cannot be found. + .. deprecated:: 3.3 + .. function:: getwriter(encoding) @@ -147,6 +152,8 @@ functions which use :func:`lookup` for t Raises a :exc:`LookupError` in case the encoding cannot be found. + .. deprecated:: 3.3 + .. function:: register_error(name, error_handler) @@ -209,10 +216,10 @@ To simplify working with encoded files o utility functions: -.. function:: open(filename, mode[, encoding[, errors[, buffering]]]) +.. function:: open(filename, mode='rb', encoding=None, errors='strict', buffering=-1) Open an encoded file using the given *mode* and return a wrapped version - providing transparent encoding/decoding. The default file mode is ``'r'`` + providing transparent encoding/decoding. The default file mode is ``'rb'`` meaning to open the file in read mode. .. note:: @@ -235,6 +242,10 @@ utility functions: *buffering* has the same meaning as for the built-in :func:`open` function. It defaults to line buffered. + .. deprecated:: 3.3 + Use the builtin :func:`open` function or the :class:`io.TextIOWrapper` + class. + .. function:: EncodedFile(file, data_encoding, file_encoding=None, errors='strict') @@ -251,6 +262,8 @@ utility functions: ``'strict'``, which causes :exc:`ValueError` to be raised in case an encoding error occurs. + .. deprecated:: 3.3 + .. function:: iterencode(iterator, encoding, errors='strict', **kwargs) @@ -563,6 +576,9 @@ The :class:`StreamWriter` class is a sub following methods which every stream writer must define in order to be compatible with the Python codec registry. +.. deprecated:: 3.3 + Use the builtin the :class:`io.TextIOWrapper` class. + .. class:: StreamWriter(stream[, errors]) @@ -628,6 +644,9 @@ The :class:`StreamReader` class is a sub following methods which every stream reader must define in order to be compatible with the Python codec registry. +.. deprecated:: 3.3 + Use the builtin the :class:`io.TextIOWrapper` class. + .. class:: StreamReader(stream[, errors]) @@ -728,6 +747,9 @@ and write modes. The design is such that one can use the factory functions returned by the :func:`lookup` function to construct the instance. +.. deprecated:: 3.3 + Use the :class:`io.TextIOWrapper` class. + .. class:: StreamReaderWriter(stream, Reader, Writer, errors) @@ -752,6 +774,8 @@ which is sometimes useful when dealing w The design is such that one can use the factory functions returned by the :func:`lookup` function to construct the instance. +.. deprecated:: 3.3 + .. class:: StreamRecoder(stream, encode, decode, Reader, Writer, errors) diff --git a/Lib/codecs.py b/Lib/codecs.py --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -345,6 +345,8 @@ class StreamWriter(Codec): The set of allowed parameter values can be extended via register_error. """ + import warnings + warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2) self.stream = stream self.errors = errors @@ -416,6 +418,8 @@ class StreamReader(Codec): The set of allowed parameter values can be extended via register_error. """ + import warnings + warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2) self.stream = stream self.errors = errors self.bytebuffer = b"" @@ -877,6 +881,9 @@ def open(filename, mode='rb', encoding=N parameter. """ + import warnings + warnings.warn('use the builtin open() function', + DeprecationWarning, stacklevel=2) if encoding is not None and \ 'b' not in mode: # Force opening of the file in binary mode @@ -885,7 +892,11 @@ def open(filename, mode='rb', encoding=N if encoding is None: return file info = lookup(encoding) - srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors) + with warnings.catch_warnings(): + # Ignore StreamReader and StreamWriter warnings, we already emitted + # one warning + warnings.simplefilter("ignore", DeprecationWarning) + srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors) # Add attributes to simplify introspection srw.encoding = encoding return srw diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1,7 +1,10 @@ from test import support +import _testcapi +import codecs +import io +import sys import unittest -import codecs -import sys, _testcapi, io +import warnings class Queue(object): """ @@ -63,7 +66,9 @@ class ReadTest(unittest.TestCase, MixInC # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue(b"") - r = codecs.getreader(self.encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + r = codecs.getreader(self.encoding)(q) result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(bytes([c])) @@ -106,7 +111,9 @@ class ReadTest(unittest.TestCase, MixInC return codecs.getreader(self.encoding)(stream) def readalllines(input, keepends=True, size=None): - reader = getreader(input) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = getreader(input) lines = [] while True: line = reader.readline(size=size, keepends=keepends) @@ -215,14 +222,18 @@ class ReadTest(unittest.TestCase, MixInC ' \r\n', ] stream = io.BytesIO("".join(s).encode(self.encoding)) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) for (i, line) in enumerate(reader): self.assertEqual(line, s[i]) def test_readlinequeue(self): q = Queue(b"") - writer = codecs.getwriter(self.encoding)(q) - reader = codecs.getreader(self.encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + writer = codecs.getwriter(self.encoding)(q) + reader = codecs.getreader(self.encoding)(q) # No lineends writer.write("foo\r") @@ -253,7 +264,9 @@ class ReadTest(unittest.TestCase, MixInC s = (s1+s2+s3).encode(self.encoding) stream = io.BytesIO(s) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) self.assertEqual(reader.readline(), s3) @@ -268,7 +281,9 @@ class ReadTest(unittest.TestCase, MixInC s = (s1+s2+s3+s4+s5).encode(self.encoding) stream = io.BytesIO(s) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) self.assertEqual(reader.readline(), s3) @@ -290,7 +305,9 @@ class UTF32Test(ReadTest): _,_,reader,writer = codecs.lookup(self.encoding) # encode some stream s = io.BytesIO() - f = writer(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = writer(s) f.write("spam") f.write("spam") d = s.getvalue() @@ -298,16 +315,22 @@ class UTF32Test(ReadTest): self.assertTrue(d == self.spamle or d == self.spambe) # try to read it back s = io.BytesIO(d) - f = reader(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = reader(s) self.assertEqual(f.read(), "spamspam") def test_badbom(self): s = io.BytesIO(4*b"\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) s = io.BytesIO(8*b"\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) def test_partial(self): @@ -462,16 +485,22 @@ class UTF16Test(ReadTest): self.assertTrue(d == self.spamle or d == self.spambe) # try to read it back s = io.BytesIO(d) - f = reader(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = reader(s) self.assertEqual(f.read(), "spamspam") def test_badbom(self): s = io.BytesIO(b"\xff\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) s = io.BytesIO(b"\xff\xff\xff\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) def test_partial(self): @@ -517,8 +546,11 @@ class UTF16Test(ReadTest): self.addCleanup(support.unlink, support.TESTFN) with open(support.TESTFN, 'wb') as fp: fp.write(s) - with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader: - self.assertEqual(reader.read(), s1) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with codecs.open(support.TESTFN, 'U', + encoding=self.encoding) as reader: + self.assertEqual(reader.read(), s1) class UTF16LETest(ReadTest): encoding = "utf-16-le" @@ -705,7 +737,9 @@ class UTF8SigTest(ReadTest): reader = codecs.getreader("utf-8-sig") for sizehint in [None] + list(range(1, 11)) + \ [64, 128, 256, 512, 1024]: - istream = reader(io.BytesIO(bytestring)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + istream = reader(io.BytesIO(bytestring)) ostream = io.StringIO() while 1: if sizehint is not None: @@ -727,7 +761,9 @@ class UTF8SigTest(ReadTest): reader = codecs.getreader("utf-8-sig") for sizehint in [None] + list(range(1, 11)) + \ [64, 128, 256, 512, 1024]: - istream = reader(io.BytesIO(bytestring)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + istream = reader(io.BytesIO(bytestring)) ostream = io.StringIO() while 1: if sizehint is not None: @@ -749,7 +785,9 @@ class EscapeDecodeTest(unittest.TestCase class RecodingTest(unittest.TestCase): def test_recoding(self): f = io.BytesIO() - f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write("a") f2.close() # Python used to crash on this at exit because of a refcount @@ -1126,7 +1164,9 @@ class IDNACodecTest(unittest.TestCase): self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.") def test_stream(self): - r = codecs.getreader("idna")(io.BytesIO(b"abc")) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + r = codecs.getreader("idna")(io.BytesIO(b"abc")) r.read(3) self.assertEqual(r.read(), "") @@ -1233,18 +1273,24 @@ class CodecsModuleTest(unittest.TestCase class StreamReaderTest(unittest.TestCase): def setUp(self): - self.reader = codecs.getreader('utf-8') + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + self.reader = codecs.getreader('utf-8') self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') def test_readlines(self): - f = self.reader(self.stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = self.reader(self.stream) self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00']) class EncodedFileTest(unittest.TestCase): def test_basic(self): f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') - ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() @@ -1388,7 +1434,9 @@ class BasicUnicodeTest(unittest.TestCase if encoding not in broken_unicode_with_streams: # check stream reader/writer q = Queue(b"") - writer = codecs.getwriter(encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + writer = codecs.getwriter(encoding)(q) encodedresult = b"" for c in s: writer.write(c) @@ -1396,7 +1444,9 @@ class BasicUnicodeTest(unittest.TestCase self.assertTrue(type(chunk) is bytes, type(chunk)) encodedresult += chunk q = Queue(b"") - reader = codecs.getreader(encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(q) decodedresult = "" for c in encodedresult: q.write(bytes([c])) @@ -1470,7 +1520,9 @@ class BasicUnicodeTest(unittest.TestCase continue if encoding in broken_unicode_with_streams: continue - reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) for t in range(5): # Test that calling seek resets the internal codec state and buffers reader.seek(0, 0) @@ -1539,15 +1591,19 @@ class CharmapTest(unittest.TestCase): class WithStmtTest(unittest.TestCase): def test_encodedfile(self): f = io.BytesIO(b"\xc3\xbc") - with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: - self.assertEqual(ef.read(), b"\xfc") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: + self.assertEqual(ef.read(), b"\xfc") def test_streamreaderwriter(self): f = io.BytesIO(b"\xc3\xbc") info = codecs.lookup("utf-8") - with codecs.StreamReaderWriter(f, info.streamreader, - info.streamwriter, 'strict') as srw: - self.assertEqual(srw.read(), "\xfc") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with codecs.StreamReaderWriter(f, info.streamreader, + info.streamwriter, 'strict') as srw: + self.assertEqual(srw.read(), "\xfc") class TypesTest(unittest.TestCase): def test_decode_unicode(self): @@ -1623,51 +1679,53 @@ class BomTest(unittest.TestCase): "utf-32-le", "utf-32-be") self.addCleanup(support.unlink, support.TESTFN) - for encoding in tests: - # Check if the BOM is written only once - with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.write(data) - f.write(data) - f.seek(0) - self.assertEqual(f.read(), data * 2) - f.seek(0) - self.assertEqual(f.read(), data * 2) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + for encoding in tests: + # Check if the BOM is written only once + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.write(data) + f.write(data) + f.seek(0) + self.assertEqual(f.read(), data * 2) + f.seek(0) + self.assertEqual(f.read(), data * 2) - # Check that the BOM is written after a seek(0) - with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.write(data[0]) - self.assertNotEqual(f.tell(), 0) - f.seek(0) - f.write(data) - f.seek(0) - self.assertEqual(f.read(), data) + # Check that the BOM is written after a seek(0) + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.write(data[0]) + self.assertNotEqual(f.tell(), 0) + f.seek(0) + f.write(data) + f.seek(0) + self.assertEqual(f.read(), data) - # (StreamWriter) Check that the BOM is written after a seek(0) - with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.writer.write(data[0]) - self.assertNotEqual(f.writer.tell(), 0) - f.writer.seek(0) - f.writer.write(data) - f.seek(0) - self.assertEqual(f.read(), data) + # (StreamWriter) Check that the BOM is written after a seek(0) + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.writer.write(data[0]) + self.assertNotEqual(f.writer.tell(), 0) + f.writer.seek(0) + f.writer.write(data) + f.seek(0) + self.assertEqual(f.read(), data) - # Check that the BOM is not written after a seek() at a position - # different than the start - with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.write(data) - f.seek(f.tell()) - f.write(data) - f.seek(0) - self.assertEqual(f.read(), data * 2) + # Check that the BOM is not written after a seek() at a + # position different than the start + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.write(data) + f.seek(f.tell()) + f.write(data) + f.seek(0) + self.assertEqual(f.read(), data * 2) - # (StreamWriter) Check that the BOM is not written after a seek() - # at a position different than the start - with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.writer.write(data) - f.writer.seek(f.writer.tell()) - f.writer.write(data) - f.seek(0) - self.assertEqual(f.read(), data * 2) + # (StreamWriter) Check that the BOM is not written after a + # seek() at a position different than the start + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.writer.write(data) + f.writer.seek(f.writer.tell()) + f.writer.write(data) + f.seek(0) + self.assertEqual(f.read(), data * 2) bytes_transform_encodings = [ @@ -1704,7 +1762,9 @@ class TransformCodecTest(unittest.TestCa def test_read(self): for encoding in bytes_transform_encodings: sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) sout = reader.read() self.assertEqual(sout, b"\x80") @@ -1713,7 +1773,9 @@ class TransformCodecTest(unittest.TestCa if encoding in ['uu_codec', 'zlib_codec']: continue sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) sout = reader.readline() self.assertEqual(sout, b"\x80") diff --git a/Misc/NEWS b/Misc/NEWS --- a/Misc/NEWS +++ b/Misc/NEWS @@ -153,6 +153,10 @@ Core and Builtins Library ------- +- Issue #8796: Deprecate open(), StreamReader, StreamWriter, + StreamReaderWriter, StreamRecoder and EncodedFile() of the codec module. Use + the builtin open() function or io.TextIOWrapper instead. + - Issue #12132: Skip test_build_ext in case the xxmodule is not found. - Issue #12105: Add O_CLOEXEC to the os module.