diff -r c64216addd7f Lib/codecs.py --- a/Lib/codecs.py Mon Aug 22 11:55:57 2011 +1000 +++ b/Lib/codecs.py Mon Aug 22 15:22:49 2011 +1000 @@ -308,6 +308,7 @@ def getstate(self): # additional state info is always 0 + # After a reset, the state must be buffer == b"" return (self.buffer, 0) def setstate(self, state): diff -r c64216addd7f Lib/test/test_codecs.py --- a/Lib/test/test_codecs.py Mon Aug 22 11:55:57 2011 +1000 +++ b/Lib/test/test_codecs.py Mon Aug 22 15:22:49 2011 +1000 @@ -2,7 +2,10 @@ import unittest import codecs import locale -import sys, _testcapi, io +import sys +import _testcapi +import io + class Queue(object): """ @@ -15,7 +18,7 @@ self._buffer += chars def read(self, size=-1): - if size<0: + if size < 0: s = self._buffer self._buffer = self._buffer[:0] # make empty return s @@ -24,9 +27,39 @@ self._buffer = self._buffer[size:] return s + +class BufferedCodecsTest(unittest.TestCase): + # Note, this does not exhaustively test the code. The code is exercised + # during unit testing by other modules which are subclassing Buffered + # coders. It tests some of the non-abstract logic included in this base + # class. + + def test_base_decoder(self): + dec = codecs.BufferedIncrementalDecoder() + with self.assertRaises(NotImplementedError): + dec._buffer_decode(1, 2, 3) + + def test_base_encoder(self): + + class TestEncoder(codecs.BufferedIncrementalEncoder): + def _buffer_encode(self, input, errors, final): + return 0, 7 + + enc = TestEncoder() + + enc.encode("Hello, world!") + self.assertEqual(enc.getstate(), "world!") + + enc.setstate("Hello, world!") + self.assertEqual(enc.getstate(), "Hello, world!") + + enc.reset() + self.assertEqual(enc.getstate(), 0) + + class MixInCheckStateHandling: def check_state_handling_decode(self, encoding, u, s): - for i in range(len(s)+1): + for i in range(len(s) + 1): d = codecs.getincrementaldecoder(encoding)() part1 = d.decode(s[:i]) state = d.getstate() @@ -45,17 +78,18 @@ d = codecs.getincrementaldecoder(encoding)() d.setstate(state) part2 = d.decode(s[i:], True) - self.assertEqual(u, part1+part2) + self.assertEqual(u, part1 + part2) def check_state_handling_encode(self, encoding, u, s): - for i in range(len(u)+1): + for i in range(len(u) + 1): d = codecs.getincrementalencoder(encoding)() part1 = d.encode(u[:i]) state = d.getstate() d = codecs.getincrementalencoder(encoding)() d.setstate(state) part2 = d.encode(u[i:], True) - self.assertEqual(s, part1+part2) + self.assertEqual(s, part1 + part2) + class ReadTest(unittest.TestCase, MixInCheckStateHandling): def check_partial(self, input, partialresults): @@ -66,7 +100,8 @@ q = Queue(b"") r = codecs.getreader(self.encoding)(q) result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): q.write(bytes([c])) result += r.read() self.assertEqual(result, partialresult) @@ -77,7 +112,8 @@ # do the check again, this time using a incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers @@ -87,7 +123,8 @@ # Check whether the reset method works properly d.reset() result = "" - for (c, partialresult) in zip(input.encode(self.encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), + partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers @@ -96,10 +133,9 @@ # check iterdecode() encoded = input.encode(self.encoding) - self.assertEqual( - input, - "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding)) - ) + self.assertEqual(input, + "".join(codecs.iterdecode([bytes([c]) for c in encoded], + self.encoding))) def test_readline(self): def getreader(input): @@ -128,27 +164,28 @@ vw = [] vwo = [] for (i, lineend) in enumerate("\n \r\n \r \u2028".split()): - vw.append((i*200)*"\3042" + lineend) - vwo.append((i*200)*"\3042") + vw.append((i * 200) * "\3042" + lineend) + vwo.append((i * 200) * "\3042") self.assertEqual(readalllines("".join(vw), True), "".join(vw)) - self.assertEqual(readalllines("".join(vw), False),"".join(vwo)) + self.assertEqual(readalllines("".join(vw), False), "".join(vwo)) # Test lines where the first read might end with \r, so the # reader has to look ahead whether this is a lone \r or a \r\n for size in range(80): for lineend in "\n \r\n \r \u2028".split(): - s = 10*(size*"a" + lineend + "xxx\n") + s = 10 * (size * "a" + lineend + "xxx\n") reader = getreader(s) for i in range(10): self.assertEqual( reader.readline(keepends=True), - size*"a" + lineend, + size * "a" + lineend, ) + reader = getreader(s) for i in range(10): self.assertEqual( reader.readline(keepends=False), - size*"a", + size * "a", ) def test_bug1175396(self): @@ -1760,6 +1797,7 @@ SurrogateEscapeTest, BomTest, TransformCodecTest, + BufferedCodecsTest, ) diff -r c64216addd7f Misc/NEWS --- a/Misc/NEWS Mon Aug 22 11:55:57 2011 +1000 +++ b/Misc/NEWS Mon Aug 22 15:22:49 2011 +1000 @@ -1209,6 +1209,9 @@ Tests ----- +- Issue #12808: Add unit tests for the BufferedIncrementalEncoder/Decoder + base classes (initial patch by Tennessee Leeuwenburg) + - Issue #11651: Improve the Makefile test targets to run more of the test suite more quickly. The --multiprocess option is now enabled by default, reducing the amount of time needed to run the tests. "make test" and "make quicktest"