diff -r 0eeffeadaa1e Lib/_pyio.py --- a/Lib/_pyio.py Mon Aug 06 17:53:19 2012 -0700 +++ b/Lib/_pyio.py Tue Aug 07 03:49:56 2012 +0200 @@ -1542,6 +1542,54 @@ class TextIOWrapper(TextIOBase): result += " mode={0!r}".format(mode) return result + " encoding={0!r}>".format(self.encoding) + def set_encoding(self, encoding, errors=None): + if not isinstance(encoding, str): + raise ValueError("invalid encoding: %r" % encoding) + + current_encoding = codecs.lookup(self._encoding).name + encoding = codecs.lookup(encoding).name + if (encoding == self._encoding + and (errors is None or errors == self._errors)): + # no change + return + + # flush read buffer, may require to seek backward in the underlying + # file object + if self._decoded_chars: + if not self.seekable(): + raise UnsupportedOperation( + "It is not possible to set the encoding " + "of a non seekable file after the first read") + assert self._snapshot is not None + dec_flags, next_input = self._snapshot + offset = self._decoded_chars_used - len(next_input) + if offset: + self.buffer.seek(offset, SEEK_CUR) + + # flush write buffer + self.flush() + + # reset attributes + self._encoding = encoding + if errors is not None: + self._errors = errors + self._encoder = None + self._decoder = None + self._decoded_chars = '' + self._decoded_chars_used = 0 + self._snapshot = None + self._b2cratio = 0.0 + + # don't write a BOM in the middle of a file + if self._seekable and self.writable(): + position = self.buffer.tell() + if position != 0: + try: + self._get_encoder().setstate(0) + except LookupError: + # Sometimes the encoder doesn't exist + pass + @property def encoding(self): return self._encoding diff -r 0eeffeadaa1e Lib/test/test_io.py --- a/Lib/test/test_io.py Mon Aug 06 17:53:19 2012 -0700 +++ b/Lib/test/test_io.py Tue Aug 07 03:49:56 2012 +0200 @@ -2361,7 +2361,6 @@ class TextIOWrapperTest(unittest.TestCas for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') - pos = f.tell() with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'aaa'.encode(charset)) @@ -2460,6 +2459,51 @@ class TextIOWrapperTest(unittest.TestCas txt.write('5') self.assertEqual(b''.join(raw._write_stack), b'123\n45') + def test_set_encoding(self): + # read + data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') + raw = self.BytesIO(data) + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + self.assertEqual(txt.readline(), 'abc\xe9\n') + txt.set_encoding('utf-8') + self.assertEqual(txt.readline(), 'd\xe9f\n') + + # On non-seekable stream, set_encoding() can be called before the first + # read + data = 'abc\xe9'.encode('latin1') + raw = self.MockUnseekableIO(data) + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.set_encoding('latin1') + self.assertEqual(txt.readline(), 'abc\xe9') + + # On non-seekable stream, set_encoding() cannot be called after the + # first read + data = 'xabc\xe9\n'.encode('latin1') + 'yd\xe9f\n'.encode('utf8') + raw = self.MockUnseekableIO(data) + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + self.assertEqual(txt.readline(), 'xabc\xe9\n') + self.assertRaises(io.UnsupportedOperation, txt.set_encoding, 'utf-8') + + # write + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + txt.write('abc\xe9\n') + txt.set_encoding('utf-8') + self.assertEqual(raw.getvalue(), b'abc\xe9\n') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') + + # write with BOM: don't write the BOM in the middle of the file + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('abc\n') + txt.set_encoding('utf-8-sig') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') + + class CTextIOWrapperTest(TextIOWrapperTest): def test_initialization(self):