Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(10441)

Side by Side Diff: Lib/test/test_codecs.py

Issue 20132: Many incremental codecs don’t handle fragmented data
Patch Set: Created 5 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/encodings/zlib_codec.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 import codecs 1 import codecs
2 import contextlib 2 import contextlib
3 import io 3 import io
4 import locale 4 import locale
5 import sys 5 import sys
6 import unittest 6 import unittest
7 import warnings 7 import warnings
8 import encodings 8 import encodings
9 9
10 from test import support 10 from test import support
(...skipping 1703 matching lines...) Expand 10 before | Expand all | Expand 10 after
1714 1714
1715 class StreamReaderTest(unittest.TestCase): 1715 class StreamReaderTest(unittest.TestCase):
1716 1716
1717 def setUp(self): 1717 def setUp(self):
1718 self.reader = codecs.getreader('utf-8') 1718 self.reader = codecs.getreader('utf-8')
1719 self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') 1719 self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1720 1720
1721 def test_readlines(self): 1721 def test_readlines(self):
1722 f = self.reader(self.stream) 1722 f = self.reader(self.stream)
1723 self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00']) 1723 self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1724
1725 def test_read_0(self):
1726 broken_multibyte = {
1727 "big5", "big5hkscs", "cp932", "cp949", "cp950",
1728 "euc_jp", "euc_jis_2004", "euc_jisx0213", "euc_kr",
1729 "gb2312", "gbk", "gb18030", "hz",
1730 "iso2022_jp", "iso2022_jp_1", "iso2022_jp_2", "iso2022_jp_2004",
1731 "iso2022_jp_3", "iso2022_jp_ext", "iso2022_kr",
1732 "johab", "shift_jis", "shift_jis_2004", "shift_jisx0213",
1733 }
1734 for encoding in all_unicode_encodings:
1735 if encoding in broken_multibyte: # read() rejects 2nd parameter
1736 continue
1737 with self.subTest(encoding=encoding):
1738 encoded = codecs.encode("characters", encoding)
1739 reader = codecs.getreader(encoding)(io.BytesIO(encoded))
1740 self.assertEqual("", reader.read(-1, 0))
1741 self.assertEqual("", reader.read(100, 0))
1742 self.assertEqual("", reader.read(0, 0))
1743 for encoding in bytes_transform_encodings:
1744 with self.subTest(encoding=encoding):
1745 encoded = codecs.encode(b"bytes", encoding)
1746 reader = codecs.getreader(encoding)(io.BytesIO(encoded))
1747 self.assertEqual(b"", reader.read(-1, 0))
1748 self.assertEqual(b"", reader.read(100, 0))
1749 self.assertEqual(b"", reader.read(0, 0))
1750 # TODO: rot-13 StreamReader seems confused between bytes and text
1724 1751
1725 class EncodedFileTest(unittest.TestCase): 1752 class EncodedFileTest(unittest.TestCase):
1726 1753
1727 def test_basic(self): 1754 def test_basic(self):
1728 f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') 1755 f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1729 ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') 1756 ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1730 self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') 1757 self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1731 1758
1732 f = io.BytesIO() 1759 f = io.BytesIO()
1733 ef = codecs.EncodedFile(f, 'utf-8', 'latin-1') 1760 ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
(...skipping 780 matching lines...) Expand 10 before | Expand all | Expand 10 after
2514 for encoding in bytes_transform_encodings: 2541 for encoding in bytes_transform_encodings:
2515 with self.subTest(encoding=encoding): 2542 with self.subTest(encoding=encoding):
2516 # generic codecs interface 2543 # generic codecs interface
2517 (o, size) = codecs.getencoder(encoding)(binput) 2544 (o, size) = codecs.getencoder(encoding)(binput)
2518 self.assertEqual(size, len(binput)) 2545 self.assertEqual(size, len(binput))
2519 (i, size) = codecs.getdecoder(encoding)(o) 2546 (i, size) = codecs.getdecoder(encoding)(o)
2520 self.assertEqual(size, len(o)) 2547 self.assertEqual(size, len(o))
2521 self.assertEqual(i, binput) 2548 self.assertEqual(i, binput)
2522 2549
2523 def test_read(self): 2550 def test_read(self):
2551 data = b"\x80data"
2552 broken_stateful = { # See Issue 20132
2553 "hex_codec", "base64_codec", "quopri_codec", "uu_codec",
2554 "bz2_codec",
2555 }
2524 for encoding in bytes_transform_encodings: 2556 for encoding in bytes_transform_encodings:
2525 with self.subTest(encoding=encoding): 2557 with self.subTest(encoding=encoding):
2526 sin = codecs.encode(b"\x80", encoding) 2558 sin = codecs.encode(data, encoding)
2527 reader = codecs.getreader(encoding)(io.BytesIO(sin)) 2559 reader = codecs.getreader(encoding)(io.BytesIO(sin))
2560
2528 sout = reader.read() 2561 sout = reader.read()
2529 self.assertEqual(sout, b"\x80") 2562 self.assertEqual(sout, data)
2563 reader.reset()
2564 reader.seek(0)
2565 sout = reader.read(-1)
2566 self.assertEqual(sout, data)
2567
2568 if encoding not in broken_stateful:
2569 for size in (1, 100):
2570 with self.subTest(size=size):
2571 reader.reset()
2572 reader.seek(0)
2573 for byte in data:
2574 sout = reader.read(size, 1)
2575 self.assertEqual(bytes((byte,)), sout)
2576 self.assertEqual(b"", reader.read(size, 1))
2577 self.assertEqual(b"", reader.read(size, 1))
2578
2579 reader.reset()
2580 reader.seek(0)
2581 buffer = bytearray()
2582 while True:
2583 sout = reader.read(size)
2584 if not len(sout):
2585 break
2586 buffer += sout
2587 self.assertEqual(data, buffer)
2588 self.assertEqual(b"", reader.read(size))
2589
2590 reader.reset()
2591 reader.seek(0)
2592 for byte in data:
2593 sout = reader.read(-1, 1)
2594 self.assertEqual(bytes((byte,)), sout)
2595 self.assertEqual(b"", reader.read(-1, 1))
2596 self.assertEqual(b"", reader.read(-1, 1))
2530 2597
2531 def test_readline(self): 2598 def test_readline(self):
2532 for encoding in bytes_transform_encodings: 2599 for encoding in bytes_transform_encodings:
2533 with self.subTest(encoding=encoding): 2600 with self.subTest(encoding=encoding):
2534 sin = codecs.encode(b"\x80", encoding) 2601 sin = codecs.encode(b"\x80", encoding)
2535 reader = codecs.getreader(encoding)(io.BytesIO(sin)) 2602 reader = codecs.getreader(encoding)(io.BytesIO(sin))
2536 sout = reader.readline() 2603 sout = reader.readline()
2537 self.assertEqual(sout, b"\x80") 2604 self.assertEqual(sout, b"\x80")
2538 2605
2539 def test_buffer_api_usage(self): 2606 def test_buffer_api_usage(self):
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
2590 2657
2591 def test_binary_to_text_blacklists_text_transforms(self): 2658 def test_binary_to_text_blacklists_text_transforms(self):
2592 # Check str -> str codec gives a good error for binary input 2659 # Check str -> str codec gives a good error for binary input
2593 for bad_input in (b"immutable", bytearray(b"mutable")): 2660 for bad_input in (b"immutable", bytearray(b"mutable")):
2594 with self.subTest(bad_input=bad_input): 2661 with self.subTest(bad_input=bad_input):
2595 msg = (r"^'rot_13' is not a text encoding; " 2662 msg = (r"^'rot_13' is not a text encoding; "
2596 "use codecs.decode\(\) to handle arbitrary codecs") 2663 "use codecs.decode\(\) to handle arbitrary codecs")
2597 with self.assertRaisesRegex(LookupError, msg) as failure: 2664 with self.assertRaisesRegex(LookupError, msg) as failure:
2598 bad_input.decode("rot_13") 2665 bad_input.decode("rot_13")
2599 self.assertIsNone(failure.exception.__cause__) 2666 self.assertIsNone(failure.exception.__cause__)
2667
2668 def test_decode_past_end(self):
2669 """Should not decode a second stream past the end of the first"""
2670 concatenable = {"hex_codec", "base64_codec", "quopri_codec"}
2671 for encoding in set(bytes_transform_encodings) - concatenable:
2672 with self.subTest(encoding=encoding):
2673 encoded = codecs.encode(b"data", encoding)
2674
2675 if encoding != "uu_codec": # Broken; see Issue 20132
2676 buffer = bytearray()
2677 decoder = codecs.getincrementaldecoder(encoding)()
2678 d1 = decoder.decode(encoded)
2679 d2 = decoder.decode(encoded)
2680 self.assertEqual(b"data", d1 + d2)
2681 self.assertEqual(b"", decoder.decode(b"", final=True))
2682
2683 if encoding == "bz2_codec": # Concatenates both streams
2684 continue
2685 reader = codecs.getreader(encoding)(io.BytesIO(encoded * 2))
2686 self.assertEqual(b"data", reader.read())
2687 self.assertEqual(b"", reader.read())
2600 2688
2601 @unittest.skipUnless(zlib, "Requires zlib support") 2689 @unittest.skipUnless(zlib, "Requires zlib support")
2602 def test_custom_zlib_error_is_wrapped(self): 2690 def test_custom_zlib_error_is_wrapped(self):
2603 # Check zlib codec gives a good error for malformed input 2691 # Check zlib codec gives a good error for malformed input
2604 msg = "^decoding with 'zlib_codec' codec failed" 2692 msg = "^decoding with 'zlib_codec' codec failed"
2605 with self.assertRaisesRegex(Exception, msg) as failure: 2693 with self.assertRaisesRegex(Exception, msg) as failure:
2606 codecs.decode(b"hello", "zlib_codec") 2694 codecs.decode(b"hello", "zlib_codec")
2607 self.assertIsInstance(failure.exception.__cause__, 2695 self.assertIsInstance(failure.exception.__cause__,
2608 type(failure.exception)) 2696 type(failure.exception))
2697
2698 @unittest.skipUnless(zlib, "Requires zlib support")
2699 def test_zlib(self):
2700 incomplete = codecs.encode(b"data", "zlib-codec")[:-1]
2701 decoder = codecs.getdecoder("zlib-codec")
2702 self.assertRaises(zlib.error, decoder, incomplete)
2703 if False: # Incomplete data not detected by IncrementalDecoder
2704 decoder = codecs.getincrementaldecoder("zlib-codec")()
2705 self.assertRaises(ValueError,
2706 decoder.decode, incomplete, final=True)
2707 reader = codecs.getreader("zlib-codec")(io.BytesIO(incomplete))
2708 self.assertRaises(ValueError, reader.read)
2609 2709
2610 def test_custom_hex_error_is_wrapped(self): 2710 def test_custom_hex_error_is_wrapped(self):
2611 # Check hex codec gives a good error for malformed input 2711 # Check hex codec gives a good error for malformed input
2612 msg = "^decoding with 'hex_codec' codec failed" 2712 msg = "^decoding with 'hex_codec' codec failed"
2613 with self.assertRaisesRegex(Exception, msg) as failure: 2713 with self.assertRaisesRegex(Exception, msg) as failure:
2614 codecs.decode(b"hello", "hex_codec") 2714 codecs.decode(b"hello", "hex_codec")
2615 self.assertIsInstance(failure.exception.__cause__, 2715 self.assertIsInstance(failure.exception.__cause__,
2616 type(failure.exception)) 2716 type(failure.exception))
2617 2717
2618 # Unfortunately, the bz2 module throws OSError, which the codec 2718 # Unfortunately, the bz2 module throws OSError, which the codec
(...skipping 350 matching lines...) Expand 10 before | Expand all | Expand 10 after
2969 self.assertEqual(decoded, ('\u9a3e\u9a3e', 4)) 3069 self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
2970 3070
2971 decoded = codecs.code_page_decode(932, 3071 decoded = codecs.code_page_decode(932,
2972 b'abc', 'strict', 3072 b'abc', 'strict',
2973 False) 3073 False)
2974 self.assertEqual(decoded, ('abc', 3)) 3074 self.assertEqual(decoded, ('abc', 3))
2975 3075
2976 3076
2977 if __name__ == "__main__": 3077 if __name__ == "__main__":
2978 unittest.main() 3078 unittest.main()
OLDNEW
« no previous file with comments | « Lib/encodings/zlib_codec.py ('k') | no next file » | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+