diff -r 63b66306e416 Doc/library/codecs.rst --- a/Doc/library/codecs.rst Sun Jan 25 03:45:49 2015 +0000 +++ b/Doc/library/codecs.rst Sun Jan 25 04:24:15 2015 +0000 @@ -1205,13 +1205,16 @@ particular, the following variants typic +-----------------+--------------------------------+--------------------------------+ | utf_16_le | UTF-16LE | all languages | +-----------------+--------------------------------+--------------------------------+ -| utf_7 | U7, unicode-1-1-utf-7 | all languages | +| utf_7 [#utf7]_ | U7, unicode-1-1-utf-7 | all languages | +-----------------+--------------------------------+--------------------------------+ | utf_8 | U8, UTF, utf8 | all languages | +-----------------+--------------------------------+--------------------------------+ | utf_8_sig | | all languages | +-----------------+--------------------------------+--------------------------------+ +.. [#utf7] The ``'utf_7'`` stateful encoder may not produce optimal output, + and the stateful decoder may buffer unlimited data for particular inputs. + .. versionchanged:: 3.4 The utf-16\* and utf-32\* encoders no longer allow surrogate code points (``U+D800``--``U+DFFF``) to be encoded. @@ -1258,7 +1261,7 @@ encodings. | | | supported. | +--------------------+---------+---------------------------+ | raw_unicode_escape | | Latin-1 encoding with | -| | | ``\uXXXX`` and | +| [#state-dec]_ | | ``\uXXXX`` and | | | | ``\UXXXXXXXX`` for other | | | | code points. Existing | | | | backslashes are not | @@ -1272,7 +1275,7 @@ encodings. | | | handler is ignored. | +--------------------+---------+---------------------------+ | unicode_escape | | Encoding suitable as the | -| | | contents of a Unicode | +| [#state-dec]_ | | contents of a Unicode | | | | literal in ASCII-encoded | | | | Python source code, | | | | except that quotes are | @@ -1293,6 +1296,9 @@ encodings. | | | :pep:`393`. | +--------------------+---------+---------------------------+ +.. [#state-dec] The ``'unicode_escape'`` and ``'raw_unicode_escape'`` codecs + do not support stateful decoding. + .. _binary-transforms: Binary Transforms @@ -1300,13 +1306,14 @@ Binary Transforms The following codecs provide binary transforms: :term:`bytes-like object` to :class:`bytes` mappings. They are not supported by :meth:`bytes.decode` -(which only produces :class:`str` output). +(which only produces :class:`str` output). These codecs do not support +the :class:`StreamReader` API. .. tabularcolumns:: |l|L|L|L| +----------------------+------------------+------------------------------+------------------------------+ -| Codec | Aliases | Purpose | Encoder / decoder | +| Codec | Aliases | Purpose | Stateless encoder / decoder | +======================+==================+==============================+==============================+ | base64_codec [#b64]_ | base64, base_64 | Convert operand to MIME | :meth:`base64.encodebytes` / | | | | base64 (the result always | :meth:`base64.decodebytes` | @@ -1327,7 +1334,7 @@ to :class:`bytes` mappings. They are no | | | representation, with two | | | | | digits per byte | | +----------------------+------------------+------------------------------+------------------------------+ -| quopri_codec | quopri, | Convert operand to MIME | :meth:`quopri.encode` with | +| quopri_codec [#qp]_ | quopri, | Convert operand to MIME | :meth:`quopri.encode` with | | | quotedprintable, | quoted printable | ``quotetabs=True`` / | | | quoted_printable | | :meth:`quopri.decode` | +----------------------+------------------+------------------------------+------------------------------+ @@ -1342,6 +1349,10 @@ to :class:`bytes` mappings. They are no ``'base64_codec'`` also accepts ASCII-only instances of :class:`str` for decoding +.. [#qp] The ``'quopri_codec'`` stateful encoder only handles data + representing text with ``b'\n'`` newlines. No ``b'\r'`` characters + should be present. + .. versionadded:: 3.2 Restoration of the binary transforms. diff -r 63b66306e416 Lib/codecs.py --- a/Lib/codecs.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/codecs.py Sun Jan 25 04:24:15 2015 +0000 @@ -337,6 +337,59 @@ class BufferedIncrementalDecoder(Increme # ignore additional state info self.buffer = state[0] +### Conversion helpers for getstate() and setstate() + +def _bytes_to_int(data, min_bytes=0): + r"""Convert byte string to integer using base-256 bijective numeration. + + >>> _bytes_to_int(b'') + 0 + >>> hex(_bytes_to_int(b'\x00')) + '0x1' + >>> hex(_bytes_to_int(b'\xFE')) + '0xff' + >>> hex(_bytes_to_int(b'\xFF')) + '0x100' + >>> hex(_bytes_to_int(b'\x00\x00')) + '0x101' + >>> hex(_bytes_to_int(b'\x00\xA0\xB0')) + '0x1a1b1' + >>> hex(_bytes_to_int(b'\x00', min_bytes=1)) + '0x0' + >>> hex(_bytes_to_int(b'\x00\xA0\xB0', min_bytes=1)) + '0x1a1b0' + """ + offset = b'\x01' * (len(data) - min_bytes) + b'\x00' * min_bytes + return int.from_bytes(data, 'big') + int.from_bytes(offset, 'big') + +def _int_to_bytes(num, min_bytes=0): + r"""Convert integer to byte string using base-256 bijective numeration. + + >>> _int_to_bytes(0) + b'' + >>> _int_to_bytes(1) + b'\x00' + >>> _int_to_bytes(255) + b'\xfe' + >>> _int_to_bytes(256) + b'\xff' + >>> _int_to_bytes(0x0101) + b'\x00\x00' + >>> _int_to_bytes(0x01A1B1) + b'\x00\xa0\xb0' + >>> _int_to_bytes(255, min_bytes=1) + b'\xff' + >>> _int_to_bytes(0x1A1FF, min_bytes=1) + b'\x00\xa0\xff' + """ + byte_length = max(-(-num.bit_length() // 8), min_bytes) + guess_offset = b'\x01' * (byte_length - min_bytes) + b'\x00' * min_bytes + num -= int.from_bytes(guess_offset, 'big') + if num < 0: # Guess (01 01 . . .) too high; remove most significant 01 + byte_length -= 1 + num += 0x01 << byte_length * 8 + return num.to_bytes(byte_length, 'big') + # # The StreamWriter and StreamReader class provide generic working # interfaces which can be used to implement new encoding submodules diff -r 63b66306e416 Lib/encodings/base64_codec.py --- a/Lib/encodings/base64_codec.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/encodings/base64_codec.py Sun Jan 25 04:24:15 2015 +0000 @@ -7,6 +7,8 @@ Written by Marc-Andre Lemburg (mal@lembu import codecs import base64 +import binascii +import re ### Codec APIs @@ -25,14 +27,54 @@ class Codec(codecs.Codec): return base64_decode(input, errors) class IncrementalEncoder(codecs.IncrementalEncoder): + def __init__(self, *pos, **kw): + codecs.IncrementalEncoder.__init__(self, *pos, **kw) + assert self.errors == 'strict' + self.reset() + + def reset(self): + self._line_bytes = 0 # Number of bytes already encoded on line + self._pending = b'' # Bytes not yet encoded and returned + def encode(self, input=b'', final=False): - assert self.errors == 'strict' - return base64.encodebytes(input) + input = self._pending + input # Converts awkward bytes-like objects + encoded = bytearray() + start = 0 + stop = base64.MAXBINSIZE - self._line_bytes + while stop <= len(input): + encoded += binascii.b2a_base64(input[start:stop]) + self._line_bytes = 0 + start = stop + stop += base64.MAXBINSIZE + stop = len(input) + if not final: + stop -= len(input) % 3 # Align to 3-byte chunk + last_line = binascii.b2a_base64(input[start:stop]) + if not final: + last_line = last_line.rstrip(b'\n') + encoded += last_line + self._line_bytes += stop - start + self._pending = input[stop:] + return bytes(encoded) -class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input=b'', final=False): - assert self.errors == 'strict' - return base64.decodebytes(input) + def getstate(self): + state = bytes((self._line_bytes,)) + self._pending + return codecs._bytes_to_int(state, min_bytes=1) + + def setstate(self, state): + state = codecs._int_to_bytes(state, min_bytes=1) + self._line_bytes = state[0] + self._pending = state[1:] + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def _buffer_decode(self, input, errors, final): + if not final: + input = chunk_re.match(input).group() + return (binascii.a2b_base64(input), len(input)) + +# Without the second level of brackets, this raises a "multiple repeat" error +chunk_re = br'(?: (?: [^A-Za-z0-9+/=]* [A-Za-z0-9+/=] ){4} )*' +chunk_re = re.compile(chunk_re, re.VERBOSE) class StreamReader(Codec, codecs.StreamReader): charbuffertype = bytes diff -r 63b66306e416 Lib/encodings/bz2_codec.py --- a/Lib/encodings/bz2_codec.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/encodings/bz2_codec.py Sun Jan 25 04:24:15 2015 +0000 @@ -52,7 +52,7 @@ class IncrementalDecoder(codecs.Incremen try: return self.decompressobj.decompress(input) except EOFError: - return '' + return b'' def reset(self): self.decompressobj = bz2.BZ2Decompressor() diff -r 63b66306e416 Lib/encodings/hex_codec.py --- a/Lib/encodings/hex_codec.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/encodings/hex_codec.py Sun Jan 25 04:24:15 2015 +0000 @@ -29,10 +29,11 @@ class IncrementalEncoder(codecs.Incremen assert self.errors == 'strict' return binascii.b2a_hex(input) -class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input=b'', final=False): - assert self.errors == 'strict' - return binascii.a2b_hex(input) +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def _buffer_decode(self, input, errors, final): + if not final and len(input) % 2: + input = input[:-1] + return (binascii.a2b_hex(input), len(input)) class StreamWriter(Codec, codecs.StreamWriter): charbuffertype = bytes diff -r 63b66306e416 Lib/encodings/quopri_codec.py --- a/Lib/encodings/quopri_codec.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/encodings/quopri_codec.py Sun Jan 25 04:24:15 2015 +0000 @@ -28,12 +28,54 @@ class Codec(codecs.Codec): return quopri_decode(input, errors) class IncrementalEncoder(codecs.IncrementalEncoder): + def __init__(self, *pos, **kw): + codecs.IncrementalEncoder.__init__(self, *pos, **kw) + self.reset() + + def reset(self): + self._dummy_line = b'' # String of x's compensating for line pos + self._pending = b'' # Previous byte if encoding depends on next byte + def encode(self, input=b'', final=False): - return quopri_encode(input, self.errors)[0] + # Concatenating to byte string converts arbitrary bytes-like objects + input = self._dummy_line + self._pending + input + encoded = quopri_encode(input, self.errors)[0] + last_line = len(encoded) - encoded.rfind(b'\n') - 1 + encoding_stop = None + # Proper encoding of the last character is not determined if: + # * It is whitespace, which would depend if it is the last character + # on the line + # * Its encoded form (x or =XX) reaches the line limit, in case the + # next character is not a newline, and a soft line break is required + self._pending = b'' + if not final and (input.endswith((b' ', b'\t')) or + last_line >= quopri.MAXLINESIZE): + self._pending = input[-1:] + if encoded[-3:-2] == b'=' and encoded[-2:].isalnum(): + encoding_stop = -3 + else: + encoding_stop = -1 + last_line += encoding_stop + encoded = encoded[len(self._dummy_line):encoding_stop] + self._dummy_line = b'x' * last_line + return encoded -class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input=b'', final=False): - return quopri_decode(input, self.errors)[0] + def getstate(self): + state = bytes((len(self._dummy_line),)) + self._pending + return codecs._bytes_to_int(state, min_bytes=1) + + def setstate(self, state): + state = codecs._int_to_bytes(state, min_bytes=1) + self._dummy_line = b'x' * state[0] + self._pending = state[1:] + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def _buffer_decode(self, input, errors, final): + if not final: + end = input.rfind(b'=', -2) + if end >= 0: + input = input[:end] + return (quopri_decode(input, self.errors)[0], len(input)) class StreamReader(Codec, codecs.StreamReader): charbuffertype = bytes diff -r 63b66306e416 Lib/encodings/uu_codec.py --- a/Lib/encodings/uu_codec.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/encodings/uu_codec.py Sun Jan 25 04:24:15 2015 +0000 @@ -10,58 +10,17 @@ modified by Jack Jansen and Fredrik Lund import codecs import binascii from io import BytesIO +from enum import IntEnum ### Codec APIs -def uu_encode(input, errors='strict', filename='', mode=0o666): - assert errors == 'strict' - infile = BytesIO(input) - outfile = BytesIO() - read = infile.read - write = outfile.write - - # Encode - write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii')) - chunk = read(45) - while chunk: - write(binascii.b2a_uu(chunk)) - chunk = read(45) - write(b' \nend\n') - - return (outfile.getvalue(), len(input)) +def uu_encode(input, errors='strict'): + encoded = IncrementalEncoder(errors).encode(input, final=True) + return (encoded, len(input)) def uu_decode(input, errors='strict'): - assert errors == 'strict' - infile = BytesIO(input) - outfile = BytesIO() - readline = infile.readline - write = outfile.write - - # Find start of encoded data - while 1: - s = readline() - if not s: - raise ValueError('Missing "begin" line in input data') - if s[:5] == b'begin': - break - - # Decode - while True: - s = readline() - if not s or s == b'end\n': - break - try: - data = binascii.a2b_uu(s) - except binascii.Error as v: - # Workaround for broken uuencoders by /Fredrik Lundh - nbytes = (((s[0]-32) & 63) * 4 + 5) // 3 - data = binascii.a2b_uu(s[:nbytes]) - #sys.stderr.write("Warning: %s\n" % str(v)) - write(data) - if not s: - raise ValueError('Truncated input data') - - return (outfile.getvalue(), len(input)) + decoded = IncrementalDecoder(errors).decode(input, final=True) + return (decoded, len(input)) class Codec(codecs.Codec): def encode(self, input, errors='strict'): @@ -71,15 +30,123 @@ class Codec(codecs.Codec): return uu_decode(input, errors) class IncrementalEncoder(codecs.IncrementalEncoder): + def __init__(self, *pos, **kw): + codecs.IncrementalEncoder.__init__(self, *pos, **kw) + self.reset() + + def reset(self): + self._initial = True + self._pending = b'' + def encode(self, input=b'', final=False): - return uu_encode(input, self.errors)[0] + assert self.errors == 'strict' + infile = BytesIO(self._pending + input) + outfile = BytesIO() + read = infile.read + write = outfile.write -class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input=b'', final=False): - return uu_decode(input, self.errors)[0] + # Encode + if self._initial: + write(b'begin 666 \n') + while True: + chunk = read(45) + if not chunk or len(chunk) < 45 and not final: + break + write(binascii.b2a_uu(chunk)) + if final: + write(b' \nend\n') -class StreamWriter(Codec, codecs.StreamWriter): - charbuffertype = bytes + self._initial = False + self._pending = chunk + return outfile.getvalue() + + def getstate(self): + state = bytes((self._initial,)) + self._pending + return codecs._bytes_to_int(state, min_bytes=1) + + def setstate(self, state): + state = codecs._int_to_bytes(state, min_bytes=1) + self._initial = state[0] + self._pending = state[1:] + +class DecodingState(IntEnum): + eol_begin = -3 # Skip rest of line before "begin" line + begin = -2 # Matching start of "begin" line + eol = -1 # Skip rest of line leading up to UU-encoded line + data = 0 # Reading normal UU-encoded data or "end" line + eof = 1 # No more decoded data + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def __init__(self, *pos, **kw): + codecs.BufferedIncrementalEncoder.__init__(self, *pos, **kw) + self.reset() + + def reset(self): + self._state = DecodingState.begin + return codecs.BufferedIncrementalDecoder.reset(self) + + def getstate(self): + undecoded, _ = codecs.BufferedIncrementalDecoder.getstate(self) + return (undecoded, self._state) + + def setstate(self, state): + _, self._state = state + return codecs.BufferedIncrementalDecoder.setstate(self, state) + + def _buffer_decode(self, input, errors, final): + assert self.errors == 'strict' + outfile = BytesIO() + offset = self._decode_partial(input, 0, outfile.write) + if final and self._state != DecodingState.eof: + if self._state <= DecodingState.begin: + raise ValueError('Missing "begin" line in input data') + else: + raise ValueError('Truncated input data') + return (outfile.getvalue(), offset) + + def _decode_partial(self, input, offset, write): + # Find start of encoded data + while self._state <= DecodingState.begin: + if self._state == DecodingState.eol_begin: + offset = input.find(b'\n', offset) + if offset < 0: + return len(input) + offset += 1 + self._state = DecodingState.begin + if self._state == DecodingState.begin: + if not b'begin'.startswith(input[offset:offset + 5]): + self._state = DecodingState.eol_begin + continue + if offset + 5 > len(input): + return offset + offset += 5 + self._state = DecodingState.eol + + # Decode + while self._state <= DecodingState.data: + if self._state == DecodingState.eol: + offset = input.find(b'\n', offset) + if offset < 0: + return len(input) + offset += 1 + self._state = DecodingState.data + if self._state == DecodingState.data: + end_probe = input[offset:offset + 4] + if b'end\n'.startswith(end_probe): + if end_probe == b"end\n": + offset += 4 + self._state = DecodingState.eof + break + return offset + nbytes = (((input[offset]-32) & 63) * 4 + 5) // 3 + if offset + nbytes > len(input): + return offset + write(binascii.a2b_uu(input[offset:offset + nbytes])) + offset += nbytes + self._state = DecodingState.eol + + # self._state == DecodingState.eof + return len(input) class StreamReader(Codec, codecs.StreamReader): charbuffertype = bytes @@ -94,6 +161,5 @@ def getregentry(): incrementalencoder=IncrementalEncoder, incrementaldecoder=IncrementalDecoder, streamreader=StreamReader, - streamwriter=StreamWriter, _is_text_encoding=False, ) diff -r 63b66306e416 Lib/test/test_codecs.py --- a/Lib/test/test_codecs.py Sun Jan 25 03:45:49 2015 +0000 +++ b/Lib/test/test_codecs.py Sun Jan 25 04:24:15 2015 +0000 @@ -6,6 +6,7 @@ import sys import unittest import warnings import encodings +import doctest from test import support @@ -1655,7 +1656,6 @@ class CodecsModuleTest(unittest.TestCase iters = tuple(codecs.iterencode(iter(()), encoding)) self.assertLessEqual(len(iters), 1) if encoding == "uu_codec": - continue # Skip due to Issue 20132 minimal = (b"begin\nend\n",) else: minimal = () @@ -1772,6 +1772,10 @@ class CodecsModuleTest(unittest.TestCase self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined', errors) + def test_doc(self): + failures, _ = doctest.testmod(codecs) + self.assertEqual(failures, 0) + class StreamReaderTest(unittest.TestCase): def setUp(self): @@ -2614,10 +2618,7 @@ class TransformCodecTest(unittest.TestCa def test_multi_write(self): data = bytes(200) # Long enough to span a base64/quopri/uu line - broken = {"base64_codec", "quopri_codec", "uu_codec"} for encoding in bytes_transform_encodings: - if encoding in broken: # See Issue 20132 - continue with self.subTest(encoding=encoding): expected = codecs.encode(data, encoding) writer = codecs.getwriter(encoding)(io.BytesIO()) @@ -2626,6 +2627,27 @@ class TransformCodecTest(unittest.TestCa writer.reset() self.assertEqual(writer.getvalue(), expected) + def test_inc_decode(self): + data = b"\x80data" + for encoding in bytes_transform_encodings: + with self.subTest(encoding=encoding): + sin = codecs.encode(data, encoding) + decoder = codecs.getincrementaldecoder(encoding)() + self.assertEqual(decoder.decode(sin, final=True), data) + decoder.reset() + sout1 = decoder.decode(sin) + sout2 = decoder.decode(b"", final=True) + self.assertEqual(sout1 + sout2, data) + + if encoding == "quopri_codec": + continue # Broken incremental decoder; see Issue 20132 + decoder.reset() + buffer = bytearray() + for byte in sin: + buffer += decoder.decode(bytes((byte,))) + buffer += decoder.decode(b"", final=True) + self.assertEqual(buffer, data) + def test_read(self): for encoding in bytes_transform_encodings: with self.subTest(encoding=encoding): @@ -2704,6 +2726,46 @@ class TransformCodecTest(unittest.TestCa bad_input.decode("rot_13") self.assertIsNone(failure.exception.__cause__) + def test_decode_incomplete(self): + """Test handling of incomplete data + + The stateless and incremental decoders should raise a ValueError. + """ + test_data = dict( + base64_codec=(b"x", -2), # Truncate newline and padding char + quopri_codec=(None, None), # Seems to tolerate any truncation + ) + for encoding in bytes_transform_encodings: + data, offset = test_data.get(encoding, (b"data", -1)) + if data is None: + continue + with self.subTest(encoding=encoding): + truncated = codecs.encode(data, encoding)[:offset] + exception = ValueError + if encoding == "zlib_codec": # zlib doesn't raise ValueError + exception = zlib.error + decoder = codecs.getdecoder(encoding) + self.assertRaises(exception, decoder, truncated) + + if encoding not in {"zlib_codec", "bz2_codec"}: + # Incomplete data not detected for zlib and bz2 + decoder = codecs.getincrementaldecoder(encoding)() + self.assertRaises(ValueError, + decoder.decode, truncated, final=True) + + def test_decode_past_end(self): + """Should not decode a second stream past the end of the first""" + concatenable = {"hex_codec", "base64_codec", "quopri_codec"} + for encoding in set(bytes_transform_encodings) - concatenable: + with self.subTest(encoding=encoding): + encoded = codecs.encode(b"data", encoding) + buffer = bytearray() + decoder = codecs.getincrementaldecoder(encoding)() + d1 = decoder.decode(encoded) + d2 = decoder.decode(encoded) + self.assertEqual(d1 + d2, b"data") + self.assertEqual(decoder.decode(b"", final=True), b"") + @unittest.skipUnless(zlib, "Requires zlib support") def test_custom_zlib_error_is_wrapped(self): # Check zlib codec gives a good error for malformed input @@ -2733,6 +2795,92 @@ class TransformCodecTest(unittest.TestCa info = codecs.lookup(alias) self.assertEqual(info.name, expected_name) + def test_iterdecode(self): + """Exercise incremental decoders with a variety of input""" + tests = ( + # (encoding, {"good": (input-tuple-1, . . .), "bad": (. . .)}) + ("hex-codec", { + "good": ( + (b"81",), + (b"0", b"", b"12", b"3"), + ), + "bad": ( + (b"0",), + (b"8", b"14",), + (b"33", b" 44 ", b"55"), + ), + }), + ("base64-codec", { + "good": ( + (b" AZ \n az \r 09 - +/ _ == ",), + (b"AA", b"", b"AAB", b"BBB", b""), + (b"AAA", b"="), + (b"AA", b"=", b"="), + (b"AAAA BBBB CCCC",), + (b"AAAA BBBB CCC", b"C"), + (b"AAAA BBBB CCCC",), + ), + "bad": ( + (b"A",), + (b"AAA",), + (b"A=", b"="), + ), + }), + ("quopri-codec", { + "good": ( + (b"=3D", b"abc"), + (b"=3", b"Dabc"), (b"=3", b"D", b"abc"), + (b"=", b"3Dabc"), (b"=", b"3", b"D", b"abc"), + (b"=\r\n", b"abc"), + (b"=\r", b"\nabc", b"=\r", b"\n", b"abc"), + (b"=", b"\r\nabc", b"=", b"\r", b"\n", b"abc"), + (b"=\n",), (b"=", b"\n"), + (b"abc=3D=\n", b"=3D"), + ), + }), + ) + for encoding, good_bad in tests: + for inputs in good_bad["good"]: + with self.subTest(encoding=encoding, inputs=inputs): + generator = codecs.iterdecode(iter(inputs), encoding) + decoded = b"".join(generator) + expected = codecs.decode(b"".join(inputs), encoding) + self.assertEqual(decoded, expected) + for inputs in good_bad.get("bad", ()): + with self.subTest(encoding=encoding, inputs=inputs), \ + self.assertRaises(ValueError): + for _ in codecs.iterdecode(iter(inputs), encoding): + pass + + def test_base64_inc_encode(self): + encoder = codecs.getincrementalencoder("base64-codec")() + + # Exercise line splitting (57 input bytes per line) + self.assertEqual(encoder.encode(byteslike(1)), b"") + self.assertEqual(encoder.encode(byteslike(2)), b"AAAA") + self.assertEqual(encoder.encode(byteslike(3 + 1)), b"AAAA") + self.assertNotIn(b"\n", encoder.encode(byteslike(56 - 7))) + encoded = encoder.encode(byteslike(1 + 57 + 30)) + self.assertEqual(encoded.count(b"\n"), 2) + self.assertEqual(encoder.encode(byteslike(27)).count(b"\n"), 1) + self.assertEqual(encoder.encode(byteslike(1), final=True), b"AA==\n") + self.assertEqual(encoder.encode(byteslike(1), final=True), b"AA==\n") + + # Exercise getstate() and setstate() + encoder.reset() + self.assertEqual(encoder.getstate(), 0) + self.assertEqual(encoder.encode(byteslike(b"1")), b"") + state1 = encoder.getstate() + self.assertEqual(encoder.encode(byteslike(b"2")), b"") + state12 = encoder.getstate() + encoder.setstate(state1) + expected = codecs.encode(b"1", "base64-codec") + self.assertEqual(encoder.encode(final=True), expected) + encoder.setstate(state12) + expected = codecs.encode(b"123", "base64-codec") + encoded = encoder.encode(byteslike(b"3"), final=True) + self.assertEqual(encoded, expected) + def test_quopri_stateless(self): # Should encode with quotetabs=True encoded = codecs.encode(b"space tab\teol \n", "quopri-codec") @@ -2741,9 +2889,183 @@ class TransformCodecTest(unittest.TestCa unescaped = b"space tab eol\n" self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped) + def test_quopri_inc_encode(self): + # Up to 76 encoded bytes per line + tests = ( + ("Full line", b"#" * 76, + b"#" * 76), + ("Soft line break", b"#" * 77, + b"#" * 75 + b"=\n" b"##"), + ("Full line ending with escape", b"#" * 73 + b"=", + b"#" * 73 + b"=3D"), + ("74 characters plus escape", b"#" * 74 + b"=", + b"#" * 74 + b"=\n" b"=3D"), + ("75 characters plus escape", b"#" * 75 + b"=", + b"#" * 75 + b"=\n" b"=3D"), + ("76 characters plus escape", b"#" * 76 + b"=", + b"#" * 75 + b"=\n" b"#=3D"), + ("Multiline", b"abc\n" + b"=" * 77 + b"\n" b"def", + b"abc\n" + (b"=3D" * 25 + b"=\n") * 3 + b"=3D=3D\n" b"def"), + ) + for desc, input, expected in tests: + with self.subTest(desc): + one_chunk = iter((byteslike(input),)) + generator = codecs.iterencode(one_chunk, "quopri-codec") + self.assertEqual(b"".join(generator), expected) + byte_per_byte = (byteslike((byte,)) for byte in input) + generator = codecs.iterencode(byte_per_byte, "quopri-codec") + self.assertEqual(b"".join(generator), expected) + + def test_quopri_enc_state(self): + encoder = codecs.getincrementalencoder("quopri-codec")() + self.assertEqual(encoder.getstate(), 0) + self.assertEqual(encoder.encode(b"#" * 73), b"#" * 73) + state_73 = encoder.getstate() + self.assertEqual(encoder.encode(b"###"), b"##") + state_76 = encoder.getstate() + encoder.setstate(state_73) + self.assertEqual(encoder.encode(b"="), b"") + state_76_esc = encoder.getstate() + encoder.setstate(state_76) + self.assertEqual(encoder.encode(b"\n", final=True), b"#\n") + encoder.setstate(state_76) + self.assertEqual(encoder.encode(b"#", final=True), b"=\n##") + encoder.setstate(state_76_esc) + self.assertEqual(encoder.encode(b"\n", final=True), b"=3D\n") + encoder.setstate(state_76_esc) + self.assertEqual(encoder.encode(b"#", final=True), b"=\n=3D#") + + def test_uu_inc_encode(self): + encoder = codecs.getincrementalencoder("uu-codec")() + state_init = encoder.getstate() + self.assertNotEqual(state_init, 0) + encoded = encoder.encode(byteslike()) + self.assertEqual(encoded.count(b"begin"), 1) + self.assertEqual(encoded.count(b"\n"), 1) + self.assertEqual(encoder.getstate(), 0) + self.assertEqual(encoder.encode(byteslike(b"\xFF")), b"") + self.assertEqual(encoder.encode(byteslike(b"\xFF" * 43)), b"") + state_largest = encoder.getstate() + self.assertEqual(encoder.encode(byteslike(b"\xFF")).count(b"\n"), 1) + self.assertEqual(encoder.getstate(), 0) + self.assertEqual(encoder.encode(byteslike(40)).count(b"\n"), 0) + encoded = encoder.encode(byteslike(5 + 45 + 40)) + self.assertEqual(encoded.count(b"\n"), 2) + encoded = encoder.encode(byteslike(5 + 40), final=True) + self.assertEqual(encoded.count(b"\n"), 4) # 2 data, null, end lines + self.assertEqual(encoded.count(b"end"), 1) + + encoder.setstate(state_largest) + encoded = encoder.encode(byteslike(1)) + self.assertEqual(encoded.count(b"\n"), 1) + self.assertNotIn(b"begin", encoded) + self.assertEqual(encoder.getstate(), 0) + + encoder.setstate(0) + self.assertEqual(encoder.encode(byteslike(44)), b"") + encoded = encoder.encode(byteslike(1)) + self.assertEqual(encoded.count(b"\n"), 1) + self.assertNotIn(b"begin", encoded) + self.assertEqual(encoder.getstate(), 0) + + encoder.setstate(state_init) + self.assertIn(b"begin", encoder.encode(byteslike(b"123"))) + + def test_uu_decode(self): + tests = ( + # (input-tuple, decoded-data) + (( + b"", b" begin skip\n", + b"\x21_\n", + b"beg", b"in\n", + b"\x23", b"\x35\x35\x35", b"\x35", b"end\n", + b"\x21\x35\x35skip\n", + b"end\n", + b"\x21_\n", + b"skip\n", + ), b"\x55" * 4), + (( + b"begin\n" + b"\x20\n" + b"\x20z\n", + b"\x20zzzz\n" + b"end\n" + ), b""), + (( + b"begin skip", b"\n" + b"\x22", b"55", b"5", b"\n" + b"en", b"d\n" + b"skip" + ), b"UU"), + ) + for inputs, expected in tests: + with self.subTest(repr(inputs)): + decoded = codecs.decode(b"".join(inputs), "uu-codec") + self.assertEqual(decoded, expected) + generator = codecs.iterdecode(iter(inputs), "uu-codec") + self.assertEqual(b"".join(generator), expected) + def test_uu_invalid(self): - # Missing "begin" line - self.assertRaises(ValueError, codecs.decode, b"", "uu-codec") + tests = ( + ((), 'Missing "begin"'), + ((b"be", b"gan"), 'Missing "begin"'), + ((b"be", b"begin"), 'Missing "begin"'), + ((b" begin\n",), 'Missing "begin"'), + ((b"begin",), "Truncated"), + ((b"begin\n" b"end",), "Truncated"), + ((b"begin", b"end\n"), "Truncated"), + ((b"begin\n", b"\x20end\n"), "Truncated"), + ((b"begin\n", b"end \n"), "Truncated"), + ) + for inputs, regex in tests: + with self.subTest(repr(inputs)): + self.assertRaisesRegex(ValueError, regex, + codecs.decode, b"".join(inputs), "uu-codec") + with self.assertRaisesRegex(ValueError, regex): + for _ in codecs.iterdecode(iter(inputs), "uu-codec"): + pass + + def test_uu_dec_state(self): + decoder = codecs.getincrementaldecoder("uu-codec")() + state_init = decoder.getstate() + self.assertEqual(decoder.decode(b"skip"), b"") + state_non_begin = decoder.getstate() + self.assertEqual(decoder.decode(b"\nbe"), b"") + state_be = decoder.getstate() + self.assertEqual(decoder.decode(b"gin"), b"") + state_skip_begin = decoder.getstate() + self.assertEqual(decoder.decode(b"\n"), b"") + self.assertSequenceEqual(decoder.getstate(), (b"", 0)) + self.assertEqual(decoder.decode(b"\x2255"), b"") + self.assertSequenceEqual(decoder.getstate(), (b"\x2255", 0)) + self.assertEqual(decoder.decode(b"5"), b"UU") + state_skip_line = decoder.getstate() + self.assertEqual(decoder.decode(b"\nen"), b"") + state_en = decoder.getstate() + self.assertEqual(decoder.decode(b"d\n"), b"") + state_eof = decoder.getstate() + + decoder.setstate(state_init) + decoded = decoder.decode(b"\x21__\nbegin\nend\n", final=True) + self.assertEqual(decoded, b"") + decoder.setstate(state_non_begin) + decoded = decoder.decode(b"begin\n\x21__\nbegin\nend\n", final=True) + self.assertEqual(decoded, b"") + decoder.setstate(state_be) + self.assertEqual(decoder.decode(b"gin\nend\n", final=True), b"") + decoder.setstate(state_skip_begin) + self.assertEqual(decoder.decode(b"\x21__\nend\n", final=True), b"") + decoder.setstate((b"", 0)) + decoded = decoder.decode(b"\x22555\nend\n", final=True) + self.assertEqual(decoded, b"UU") + decoder.setstate((b"\x2255", 0)) + self.assertEqual(decoder.decode(b"5\nend\n", final=True), b"UU") + decoder.setstate(state_skip_line) + self.assertEqual(decoder.decode(b"\x21__\nend\n", final=True), b"") + decoder.setstate(state_en) + self.assertEqual(decoder.decode(b"d\n", final=True), b"") + decoder.setstate(state_eof) + self.assertEqual(decoder.decode(b"", final=True), b"") # The codec system tries to wrap exceptions in order to ensure the error