diff -r b09d07d1f696 Lib/codecs.py --- a/Lib/codecs.py Sat Aug 20 03:19:34 2011 +0200 +++ b/Lib/codecs.py Mon Aug 22 13:09:06 2011 +1000 @@ -308,6 +308,7 @@ def getstate(self): # additional state info is always 0 + # After a reset, the state must be buffer == b"" return (self.buffer, 0) def setstate(self, state): @@ -957,7 +958,7 @@ or the codecs doesn't provide an incremental encoder. """ - encoder = lookup(encoding).incrementalencoder + encoder = lookup(encoding).incrementalencoder if encoder is None: raise LookupError(encoding) return encoder diff -r b09d07d1f696 Lib/test/test_codecs.py --- a/Lib/test/test_codecs.py Sat Aug 20 03:19:34 2011 +0200 +++ b/Lib/test/test_codecs.py Mon Aug 22 13:09:06 2011 +1000 @@ -2,7 +2,10 @@ import unittest import codecs import locale -import sys, _testcapi, io +import sys +import _testcapi +import io + class Queue(object): """ @@ -15,7 +18,7 @@ self._buffer += chars def read(self, size=-1): - if size<0: + if size < 0: s = self._buffer self._buffer = self._buffer[:0] # make empty return s @@ -24,9 +27,48 @@ self._buffer = self._buffer[size:] return s + +class BufferedCodecsTest(unittest.TestCase): + ''' + Note, this does not exhaustively test the code. The code is exercised + during unit testing by other modules which are subclassing Buffered + coders. It tests some of the non-abstract logic included in this base + class. + ''' + + def test_BufferendIncrementalDecoder(self): + with self.assertRaises(NotImplementedError): + dec = codecs.BufferedIncrementalDecoder() + dec._buffer_decode(1, 2, 3) + + def test_BufferedIncrementalEncoder(self): + + class MockBufferedIncrementalEncoder( + codecs.BufferedIncrementalEncoder): + + def _buffer_encode(self, input, errors, final): + ''' + ''' + return 0, 7 + + enc = MockBufferedIncrementalEncoder() + + enc.encode("Hello, world!") + theState = enc.getstate() + assert theState == "world!" + + enc.setstate("Hello, world!") + theState = enc.getstate() + assert theState == "Hello, world!" + + enc.reset() + state = enc.getstate() + assert state == 0 + + class MixInCheckStateHandling: def check_state_handling_decode(self, encoding, u, s): - for i in range(len(s)+1): + for i in range(len(s) + 1): d = codecs.getincrementaldecoder(encoding)() part1 = d.decode(s[:i]) state = d.getstate() @@ -45,17 +87,18 @@ d = codecs.getincrementaldecoder(encoding)() d.setstate(state) part2 = d.decode(s[i:], True) - self.assertEqual(u, part1+part2) + self.assertEqual(u, part1 + part2) def check_state_handling_encode(self, encoding, u, s): - for i in range(len(u)+1): + for i in range(len(u) + 1): d = codecs.getincrementalencoder(encoding)() part1 = d.encode(u[:i]) state = d.getstate() d = codecs.getincrementalencoder(encoding)() d.setstate(state) part2 = d.encode(u[i:], True) - self.assertEqual(s, part1+part2) + self.assertEqual(s, part1 + part2) + class ReadTest(unittest.TestCase, MixInCheckStateHandling): def check_partial(self, input, partialresults): @@ -66,7 +109,8 @@ q = Queue(b"") r = codecs.getreader(self.encoding)(q) result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): q.write(bytes([c])) result += r.read() self.assertEqual(result, partialresult) @@ -77,7 +121,8 @@ # do the check again, this time using a incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers @@ -87,7 +132,8 @@ # Check whether the reset method works properly d.reset() result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers @@ -96,10 +142,9 @@ # check iterdecode() encoded = input.encode(self.encoding) - self.assertEqual( - input, - "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding)) - ) + self.assertEqual(input, + "".join(codecs.iterdecode([bytes([c]) for c in encoded], + self.encoding))) def test_readline(self): def getreader(input): @@ -128,27 +173,28 @@ vw = [] vwo = [] for (i, lineend) in enumerate("\n \r\n \r \u2028".split()): - vw.append((i*200)*"\3042" + lineend) - vwo.append((i*200)*"\3042") + vw.append((i * 200) * "\3042" + lineend) + vwo.append((i * 200) * "\3042") self.assertEqual(readalllines("".join(vw), True), "".join(vw)) - self.assertEqual(readalllines("".join(vw), False),"".join(vwo)) + self.assertEqual(readalllines("".join(vw), False), "".join(vwo)) # Test lines where the first read might end with \r, so the # reader has to look ahead whether this is a lone \r or a \r\n for size in range(80): for lineend in "\n \r\n \r \u2028".split(): - s = 10*(size*"a" + lineend + "xxx\n") + s = 10 * (size * "a" + lineend + "xxx\n") reader = getreader(s) for i in range(10): self.assertEqual( reader.readline(keepends=True), - size*"a" + lineend, + size * "a" + lineend, ) + reader = getreader(s) for i in range(10): self.assertEqual( reader.readline(keepends=False), - size*"a", + size * "a", ) def test_bug1175396(self): @@ -1760,6 +1806,7 @@ SurrogateEscapeTest, BomTest, TransformCodecTest, + BufferedCodecsTest, )