Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(170013)

Delta Between Two Patch Sets: Lib/test/test_bz2.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 7 years ago
Right Patch Set: Created 5 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_builtin.py ('k') | Lib/test/test_capi.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 #!/usr/bin/env python3
2 from test import support 1 from test import support
3 from test.support import bigmemtest, _4G 2 from test.support import bigmemtest, _4G
4 3
5 import unittest 4 import unittest
6 from io import BytesIO 5 from io import BytesIO
7 import os 6 import os
7 import pickle
8 import random 8 import random
9 import subprocess 9 import subprocess
10 import sys 10 import sys
11 from test.support import unlink
11 12
12 try: 13 try:
13 import threading 14 import threading
14 except ImportError: 15 except ImportError:
15 threading = None 16 threading = None
16 17
17 # Skip tests if the bz2 module doesn't exist. 18 # Skip tests if the bz2 module doesn't exist.
18 bz2 = support.import_module('bz2') 19 bz2 = support.import_module('bz2')
19 from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 20 from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
20 21
(...skipping 19 matching lines...) Expand all
40 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', 41 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n',
41 b'nobody:x:65534:65534:Nobody:/home:\n', 42 b'nobody:x:65534:65534:Nobody:/home:\n',
42 b'postfix:x:100:101:postfix:/var/spool/postfix:\n', 43 b'postfix:x:100:101:postfix:/var/spool/postfix:\n',
43 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', 44 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n',
44 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', 45 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n',
45 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', 46 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n',
46 b'www:x:103:104::/var/www:/bin/false\n', 47 b'www:x:103:104::/var/www:/bin/false\n',
47 ] 48 ]
48 TEXT = b''.join(TEXT_LINES) 49 TEXT = b''.join(TEXT_LINES)
49 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x0 0?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\ x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5 \x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1 \x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10 \x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\ xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17V H>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa 1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a \xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x1 3^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x9 2\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa 9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\ xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x1 6\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 50 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x0 0?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\ x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5 \x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1 \x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10 \x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\ xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17V H>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa 1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a \xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x1 3^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x9 2\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa 9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\ xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x1 6\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
51 EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
52 BAD_DATA = b'this is not a valid bzip2 file'
50 53
51 def setUp(self): 54 def setUp(self):
52 self.filename = support.TESTFN 55 self.filename = support.TESTFN
53 56
54 def tearDown(self): 57 def tearDown(self):
55 if os.path.isfile(self.filename): 58 if os.path.isfile(self.filename):
56 os.unlink(self.filename) 59 os.unlink(self.filename)
57 60
58 if sys.platform == "win32": 61 if sys.platform == "win32":
59 # bunzip2 isn't available to run on Windows. 62 # bunzip2 isn't available to run on Windows.
(...skipping 10 matching lines...) Expand all
70 ret = pop.stdout.read() 73 ret = pop.stdout.read()
71 pop.stdout.close() 74 pop.stdout.close()
72 if pop.wait() != 0: 75 if pop.wait() != 0:
73 ret = bz2.decompress(data) 76 ret = bz2.decompress(data)
74 return ret 77 return ret
75 78
76 79
77 class BZ2FileTest(BaseTest): 80 class BZ2FileTest(BaseTest):
78 "Test the BZ2File class." 81 "Test the BZ2File class."
79 82
80 def createTempFile(self, streams=1): 83 def createTempFile(self, streams=1, suffix=b""):
81 with open(self.filename, "wb") as f: 84 with open(self.filename, "wb") as f:
82 f.write(self.DATA * streams) 85 f.write(self.DATA * streams)
86 f.write(suffix)
83 87
84 def testBadArgs(self): 88 def testBadArgs(self):
85 self.assertRaises(TypeError, BZ2File, 123.456) 89 self.assertRaises(TypeError, BZ2File, 123.456)
86 self.assertRaises(ValueError, BZ2File, "/dev/null", "z") 90 self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
87 self.assertRaises(ValueError, BZ2File, "/dev/null", "rx") 91 self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
88 self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt") 92 self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
89 self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0) 93 self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
90 self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10) 94 self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
91 95
92 def testRead(self): 96 def testRead(self):
93 self.createTempFile() 97 self.createTempFile()
94 with BZ2File(self.filename) as bz2f: 98 with BZ2File(self.filename) as bz2f:
95 self.assertRaises(TypeError, bz2f.read, None) 99 self.assertRaises(TypeError, bz2f.read, None)
96 self.assertEqual(bz2f.read(), self.TEXT) 100 self.assertEqual(bz2f.read(), self.TEXT)
97 101
102 def testReadBadFile(self):
103 self.createTempFile(streams=0, suffix=self.BAD_DATA)
104 with BZ2File(self.filename) as bz2f:
105 self.assertRaises(OSError, bz2f.read)
106
98 def testReadMultiStream(self): 107 def testReadMultiStream(self):
99 self.createTempFile(streams=5) 108 self.createTempFile(streams=5)
100 with BZ2File(self.filename) as bz2f: 109 with BZ2File(self.filename) as bz2f:
101 self.assertRaises(TypeError, bz2f.read, None) 110 self.assertRaises(TypeError, bz2f.read, None)
102 self.assertEqual(bz2f.read(), self.TEXT * 5) 111 self.assertEqual(bz2f.read(), self.TEXT * 5)
103 112
104 def testReadMonkeyMultiStream(self): 113 def testReadMonkeyMultiStream(self):
105 # Test BZ2File.read() on a multi-stream archive where a stream 114 # Test BZ2File.read() on a multi-stream archive where a stream
106 # boundary coincides with the end of the raw read buffer. 115 # boundary coincides with the end of the raw read buffer.
107 buffer_size = bz2._BUFFER_SIZE 116 buffer_size = bz2._BUFFER_SIZE
108 bz2._BUFFER_SIZE = len(self.DATA) 117 bz2._BUFFER_SIZE = len(self.DATA)
109 try: 118 try:
110 self.createTempFile(streams=5) 119 self.createTempFile(streams=5)
111 with BZ2File(self.filename) as bz2f: 120 with BZ2File(self.filename) as bz2f:
112 self.assertRaises(TypeError, bz2f.read, None) 121 self.assertRaises(TypeError, bz2f.read, None)
113 self.assertEqual(bz2f.read(), self.TEXT * 5) 122 self.assertEqual(bz2f.read(), self.TEXT * 5)
114 finally: 123 finally:
115 bz2._BUFFER_SIZE = buffer_size 124 bz2._BUFFER_SIZE = buffer_size
125
126 def testReadTrailingJunk(self):
127 self.createTempFile(suffix=self.BAD_DATA)
128 with BZ2File(self.filename) as bz2f:
129 self.assertEqual(bz2f.read(), self.TEXT)
130
131 def testReadMultiStreamTrailingJunk(self):
132 self.createTempFile(streams=5, suffix=self.BAD_DATA)
133 with BZ2File(self.filename) as bz2f:
134 self.assertEqual(bz2f.read(), self.TEXT * 5)
116 135
117 def testRead0(self): 136 def testRead0(self):
118 self.createTempFile() 137 self.createTempFile()
119 with BZ2File(self.filename) as bz2f: 138 with BZ2File(self.filename) as bz2f:
120 self.assertRaises(TypeError, bz2f.read, None) 139 self.assertRaises(TypeError, bz2f.read, None)
121 self.assertEqual(bz2f.read(0), b"") 140 self.assertEqual(bz2f.read(0), b"")
122 141
123 def testReadChunk10(self): 142 def testReadChunk10(self):
124 self.createTempFile() 143 self.createTempFile()
125 with BZ2File(self.filename) as bz2f: 144 with BZ2File(self.filename) as bz2f:
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 # should raise an exception. 265 # should raise an exception.
247 self.assertRaises(ValueError, bz2f.writelines, ["a"]) 266 self.assertRaises(ValueError, bz2f.writelines, ["a"])
248 with open(self.filename, 'rb') as f: 267 with open(self.filename, 'rb') as f:
249 self.assertEqual(self.decompress(f.read()), self.TEXT) 268 self.assertEqual(self.decompress(f.read()), self.TEXT)
250 269
251 def testWriteMethodsOnReadOnlyFile(self): 270 def testWriteMethodsOnReadOnlyFile(self):
252 with BZ2File(self.filename, "w") as bz2f: 271 with BZ2File(self.filename, "w") as bz2f:
253 bz2f.write(b"abc") 272 bz2f.write(b"abc")
254 273
255 with BZ2File(self.filename, "r") as bz2f: 274 with BZ2File(self.filename, "r") as bz2f:
256 self.assertRaises(IOError, bz2f.write, b"a") 275 self.assertRaises(OSError, bz2f.write, b"a")
257 self.assertRaises(IOError, bz2f.writelines, [b"a"]) 276 self.assertRaises(OSError, bz2f.writelines, [b"a"])
258 277
259 def testAppend(self): 278 def testAppend(self):
260 with BZ2File(self.filename, "w") as bz2f: 279 with BZ2File(self.filename, "w") as bz2f:
261 self.assertRaises(TypeError, bz2f.write) 280 self.assertRaises(TypeError, bz2f.write)
262 bz2f.write(self.TEXT) 281 bz2f.write(self.TEXT)
263 with BZ2File(self.filename, "a") as bz2f: 282 with BZ2File(self.filename, "a") as bz2f:
264 self.assertRaises(TypeError, bz2f.write) 283 self.assertRaises(TypeError, bz2f.write)
265 bz2f.write(self.TEXT) 284 bz2f.write(self.TEXT)
266 with open(self.filename, 'rb') as f: 285 with open(self.filename, 'rb') as f:
267 self.assertEqual(self.decompress(f.read()), self.TEXT * 2) 286 self.assertEqual(self.decompress(f.read()), self.TEXT * 2)
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
422 bz2f.close() 441 bz2f.close()
423 self.assertRaises(ValueError, bz2f.writable) 442 self.assertRaises(ValueError, bz2f.writable)
424 443
425 def testOpenDel(self): 444 def testOpenDel(self):
426 self.createTempFile() 445 self.createTempFile()
427 for i in range(10000): 446 for i in range(10000):
428 o = BZ2File(self.filename) 447 o = BZ2File(self.filename)
429 del o 448 del o
430 449
431 def testOpenNonexistent(self): 450 def testOpenNonexistent(self):
432 self.assertRaises(IOError, BZ2File, "/non/existent") 451 self.assertRaises(OSError, BZ2File, "/non/existent")
433 452
434 def testReadlinesNoNewline(self): 453 def testReadlinesNoNewline(self):
435 # Issue #1191043: readlines() fails on a file containing no newline. 454 # Issue #1191043: readlines() fails on a file containing no newline.
436 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x0 0 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' 455 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x0 0 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
437 with open(self.filename, "wb") as f: 456 with open(self.filename, "wb") as f:
438 f.write(data) 457 f.write(data)
439 with BZ2File(self.filename) as bz2f: 458 with BZ2File(self.filename) as bz2f:
440 lines = bz2f.readlines() 459 lines = bz2f.readlines()
441 self.assertEqual(lines, [b'Test']) 460 self.assertEqual(lines, [b'Test'])
442 with BZ2File(self.filename) as bz2f: 461 with BZ2File(self.filename) as bz2f:
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after
561 bz2f.seek(150) 580 bz2f.seek(150)
562 self.assertEqual(bz2f.read(), self.TEXT[150:]) 581 self.assertEqual(bz2f.read(), self.TEXT[150:])
563 582
564 def testSeekBackwardsBytesIO(self): 583 def testSeekBackwardsBytesIO(self):
565 with BytesIO(self.DATA) as bio: 584 with BytesIO(self.DATA) as bio:
566 with BZ2File(bio) as bz2f: 585 with BZ2File(bio) as bz2f:
567 bz2f.read(500) 586 bz2f.read(500)
568 bz2f.seek(-150, 1) 587 bz2f.seek(-150, 1)
569 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 588 self.assertEqual(bz2f.read(), self.TEXT[500-150:])
570 589
590 def test_read_truncated(self):
591 # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
592 truncated = self.DATA[:-10]
593 with BZ2File(BytesIO(truncated)) as f:
594 self.assertRaises(EOFError, f.read)
595 with BZ2File(BytesIO(truncated)) as f:
596 self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
597 self.assertRaises(EOFError, f.read, 1)
598 # Incomplete 4-byte file header, and block header of at least 146 bits.
599 for i in range(22):
600 with BZ2File(BytesIO(truncated[:i])) as f:
601 self.assertRaises(EOFError, f.read, 1)
602
571 603
572 class BZ2CompressorTest(BaseTest): 604 class BZ2CompressorTest(BaseTest):
573 def testCompress(self): 605 def testCompress(self):
574 bz2c = BZ2Compressor() 606 bz2c = BZ2Compressor()
575 self.assertRaises(TypeError, bz2c.compress) 607 self.assertRaises(TypeError, bz2c.compress)
576 data = bz2c.compress(self.TEXT) 608 data = bz2c.compress(self.TEXT)
577 data += bz2c.flush() 609 data += bz2c.flush()
578 self.assertEqual(self.decompress(data), self.TEXT) 610 self.assertEqual(self.decompress(data), self.TEXT)
611
612 def testCompressEmptyString(self):
613 bz2c = BZ2Compressor()
614 data = bz2c.compress(b'')
615 data += bz2c.flush()
616 self.assertEqual(data, self.EMPTY_DATA)
579 617
580 def testCompressChunks10(self): 618 def testCompressChunks10(self):
581 bz2c = BZ2Compressor() 619 bz2c = BZ2Compressor()
582 n = 0 620 n = 0
583 data = b'' 621 data = b''
584 while True: 622 while True:
585 str = self.TEXT[n*10:(n+1)*10] 623 str = self.TEXT[n*10:(n+1)*10]
586 if not str: 624 if not str:
587 break 625 break
588 data += bz2c.compress(str) 626 data += bz2c.compress(str)
(...skipping 11 matching lines...) Expand all
600 compressed += bz2c.flush() 638 compressed += bz2c.flush()
601 finally: 639 finally:
602 data = None # Release memory 640 data = None # Release memory
603 data = bz2.decompress(compressed) 641 data = bz2.decompress(compressed)
604 try: 642 try:
605 self.assertEqual(len(data), size) 643 self.assertEqual(len(data), size)
606 self.assertEqual(len(data.strip(b"x")), 0) 644 self.assertEqual(len(data.strip(b"x")), 0)
607 finally: 645 finally:
608 data = None 646 data = None
609 647
648 def testPickle(self):
649 with self.assertRaises(TypeError):
650 pickle.dumps(BZ2Compressor())
651
652
610 class BZ2DecompressorTest(BaseTest): 653 class BZ2DecompressorTest(BaseTest):
611 def test_Constructor(self): 654 def test_Constructor(self):
612 self.assertRaises(TypeError, BZ2Decompressor, 42) 655 self.assertRaises(TypeError, BZ2Decompressor, 42)
613 656
614 def testDecompress(self): 657 def testDecompress(self):
615 bz2d = BZ2Decompressor() 658 bz2d = BZ2Decompressor()
616 self.assertRaises(TypeError, bz2d.decompress) 659 self.assertRaises(TypeError, bz2d.decompress)
617 text = bz2d.decompress(self.DATA) 660 text = bz2d.decompress(self.DATA)
618 self.assertEqual(text, self.TEXT) 661 self.assertEqual(text, self.TEXT)
619 662
(...skipping 15 matching lines...) Expand all
635 text = bz2d.decompress(self.DATA+unused_data) 678 text = bz2d.decompress(self.DATA+unused_data)
636 self.assertEqual(text, self.TEXT) 679 self.assertEqual(text, self.TEXT)
637 self.assertEqual(bz2d.unused_data, unused_data) 680 self.assertEqual(bz2d.unused_data, unused_data)
638 681
639 def testEOFError(self): 682 def testEOFError(self):
640 bz2d = BZ2Decompressor() 683 bz2d = BZ2Decompressor()
641 text = bz2d.decompress(self.DATA) 684 text = bz2d.decompress(self.DATA)
642 self.assertRaises(EOFError, bz2d.decompress, b"anything") 685 self.assertRaises(EOFError, bz2d.decompress, b"anything")
643 self.assertRaises(EOFError, bz2d.decompress, b"") 686 self.assertRaises(EOFError, bz2d.decompress, b"")
644 687
645 @bigmemtest(size=_4G + 100, memuse=3) 688 @bigmemtest(size=_4G + 100, memuse=3.3)
646 def testDecompress4G(self, size): 689 def testDecompress4G(self, size):
647 # "Test BZ2Decompressor.decompress() with >4GiB input" 690 # "Test BZ2Decompressor.decompress() with >4GiB input"
648 blocksize = 10 * 1024 * 1024 691 blocksize = 10 * 1024 * 1024
649 block = random.getrandbits(blocksize * 8).to_bytes(blocksize, 'little') 692 block = random.getrandbits(blocksize * 8).to_bytes(blocksize, 'little')
650 try: 693 try:
651 data = block * (size // blocksize + 1) 694 data = block * (size // blocksize + 1)
652 compressed = bz2.compress(data) 695 compressed = bz2.compress(data)
653 bz2d = BZ2Decompressor() 696 bz2d = BZ2Decompressor()
654 decompressed = bz2d.decompress(compressed) 697 decompressed = bz2d.decompress(compressed)
655 self.assertEqual(decompressed, data) 698 self.assertEqual(decompressed, data)
656 finally: 699 finally:
657 data = None 700 data = None
658 compressed = None 701 compressed = None
659 decompressed = None 702 decompressed = None
660 703
704 def testPickle(self):
705 with self.assertRaises(TypeError):
706 pickle.dumps(BZ2Decompressor())
707
661 708
662 class CompressDecompressTest(BaseTest): 709 class CompressDecompressTest(BaseTest):
663 def testCompress(self): 710 def testCompress(self):
664 data = bz2.compress(self.TEXT) 711 data = bz2.compress(self.TEXT)
665 self.assertEqual(self.decompress(data), self.TEXT) 712 self.assertEqual(self.decompress(data), self.TEXT)
666 713
714 def testCompressEmptyString(self):
715 text = bz2.compress(b'')
716 self.assertEqual(text, self.EMPTY_DATA)
717
667 def testDecompress(self): 718 def testDecompress(self):
668 text = bz2.decompress(self.DATA) 719 text = bz2.decompress(self.DATA)
669 self.assertEqual(text, self.TEXT) 720 self.assertEqual(text, self.TEXT)
670 721
671 def testDecompressEmpty(self): 722 def testDecompressEmpty(self):
672 text = bz2.decompress(b"") 723 text = bz2.decompress(b"")
673 self.assertEqual(text, b"") 724 self.assertEqual(text, b"")
674 725
726 def testDecompressToEmptyString(self):
727 text = bz2.decompress(self.EMPTY_DATA)
728 self.assertEqual(text, b'')
729
675 def testDecompressIncomplete(self): 730 def testDecompressIncomplete(self):
676 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 731 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10])
732
733 def testDecompressBadData(self):
734 self.assertRaises(OSError, bz2.decompress, self.BAD_DATA)
677 735
678 def testDecompressMultiStream(self): 736 def testDecompressMultiStream(self):
679 text = bz2.decompress(self.DATA * 5) 737 text = bz2.decompress(self.DATA * 5)
680 self.assertEqual(text, self.TEXT * 5) 738 self.assertEqual(text, self.TEXT * 5)
681 739
740 def testDecompressTrailingJunk(self):
741 text = bz2.decompress(self.DATA + self.BAD_DATA)
742 self.assertEqual(text, self.TEXT)
743
744 def testDecompressMultiStreamTrailingJunk(self):
745 text = bz2.decompress(self.DATA * 5 + self.BAD_DATA)
746 self.assertEqual(text, self.TEXT * 5)
747
682 748
683 class OpenTest(BaseTest): 749 class OpenTest(BaseTest):
684 "Test the open function." 750 "Test the open function."
685 751
686 def open(self, *args, **kwargs): 752 def open(self, *args, **kwargs):
687 return bz2.open(*args, **kwargs) 753 return bz2.open(*args, **kwargs)
688 754
689 def test_binary_modes(self): 755 def test_binary_modes(self):
690 with self.open(self.filename, "wb") as f: 756 for mode in ("wb", "xb"):
691 f.write(self.TEXT) 757 if mode == "xb":
692 with open(self.filename, "rb") as f: 758 unlink(self.filename)
693 file_data = self.decompress(f.read()) 759 with self.open(self.filename, mode) as f:
694 self.assertEqual(file_data, self.TEXT) 760 f.write(self.TEXT)
695 with self.open(self.filename, "rb") as f: 761 with open(self.filename, "rb") as f:
696 self.assertEqual(f.read(), self.TEXT) 762 file_data = self.decompress(f.read())
697 with self.open(self.filename, "ab") as f: 763 self.assertEqual(file_data, self.TEXT)
698 f.write(self.TEXT) 764 with self.open(self.filename, "rb") as f:
699 with open(self.filename, "rb") as f: 765 self.assertEqual(f.read(), self.TEXT)
700 file_data = self.decompress(f.read()) 766 with self.open(self.filename, "ab") as f:
701 self.assertEqual(file_data, self.TEXT * 2) 767 f.write(self.TEXT)
768 with open(self.filename, "rb") as f:
769 file_data = self.decompress(f.read())
770 self.assertEqual(file_data, self.TEXT * 2)
702 771
703 def test_implicit_binary_modes(self): 772 def test_implicit_binary_modes(self):
704 # Test implicit binary modes (no "b" or "t" in mode string). 773 # Test implicit binary modes (no "b" or "t" in mode string).
705 with self.open(self.filename, "w") as f: 774 for mode in ("w", "x"):
706 f.write(self.TEXT) 775 if mode == "x":
707 with open(self.filename, "rb") as f: 776 unlink(self.filename)
708 file_data = self.decompress(f.read()) 777 with self.open(self.filename, mode) as f:
709 self.assertEqual(file_data, self.TEXT) 778 f.write(self.TEXT)
710 with self.open(self.filename, "r") as f: 779 with open(self.filename, "rb") as f:
711 self.assertEqual(f.read(), self.TEXT) 780 file_data = self.decompress(f.read())
712 with self.open(self.filename, "a") as f: 781 self.assertEqual(file_data, self.TEXT)
713 f.write(self.TEXT) 782 with self.open(self.filename, "r") as f:
714 with open(self.filename, "rb") as f: 783 self.assertEqual(f.read(), self.TEXT)
715 file_data = self.decompress(f.read()) 784 with self.open(self.filename, "a") as f:
716 self.assertEqual(file_data, self.TEXT * 2) 785 f.write(self.TEXT)
786 with open(self.filename, "rb") as f:
787 file_data = self.decompress(f.read())
788 self.assertEqual(file_data, self.TEXT * 2)
717 789
718 def test_text_modes(self): 790 def test_text_modes(self):
719 text = self.TEXT.decode("ascii") 791 text = self.TEXT.decode("ascii")
720 text_native_eol = text.replace("\n", os.linesep) 792 text_native_eol = text.replace("\n", os.linesep)
721 with self.open(self.filename, "wt") as f: 793 for mode in ("wt", "xt"):
722 f.write(text) 794 if mode == "xt":
723 with open(self.filename, "rb") as f: 795 unlink(self.filename)
724 file_data = self.decompress(f.read()).decode("ascii") 796 with self.open(self.filename, mode) as f:
725 self.assertEqual(file_data, text_native_eol) 797 f.write(text)
726 with self.open(self.filename, "rt") as f: 798 with open(self.filename, "rb") as f:
727 self.assertEqual(f.read(), text) 799 file_data = self.decompress(f.read()).decode("ascii")
728 with self.open(self.filename, "at") as f: 800 self.assertEqual(file_data, text_native_eol)
729 f.write(text) 801 with self.open(self.filename, "rt") as f:
730 with open(self.filename, "rb") as f: 802 self.assertEqual(f.read(), text)
731 file_data = self.decompress(f.read()).decode("ascii") 803 with self.open(self.filename, "at") as f:
732 self.assertEqual(file_data, text_native_eol * 2) 804 f.write(text)
805 with open(self.filename, "rb") as f:
806 file_data = self.decompress(f.read()).decode("ascii")
807 self.assertEqual(file_data, text_native_eol * 2)
808
809 def test_x_mode(self):
810 for mode in ("x", "xb", "xt"):
811 unlink(self.filename)
812 with self.open(self.filename, mode) as f:
813 pass
814 with self.assertRaises(FileExistsError):
815 with self.open(self.filename, mode) as f:
816 pass
733 817
734 def test_fileobj(self): 818 def test_fileobj(self):
735 with self.open(BytesIO(self.DATA), "r") as f: 819 with self.open(BytesIO(self.DATA), "r") as f:
736 self.assertEqual(f.read(), self.TEXT) 820 self.assertEqual(f.read(), self.TEXT)
737 with self.open(BytesIO(self.DATA), "rb") as f: 821 with self.open(BytesIO(self.DATA), "rb") as f:
738 self.assertEqual(f.read(), self.TEXT) 822 self.assertEqual(f.read(), self.TEXT)
739 text = self.TEXT.decode("ascii") 823 text = self.TEXT.decode("ascii")
740 with self.open(BytesIO(self.DATA), "rt") as f: 824 with self.open(BytesIO(self.DATA), "rt") as f:
741 self.assertEqual(f.read(), text) 825 self.assertEqual(f.read(), text)
742 826
743 def test_bad_params(self): 827 def test_bad_params(self):
744 # Test invalid parameter combinations. 828 # Test invalid parameter combinations.
745 self.assertRaises(ValueError, 829 self.assertRaises(ValueError,
746 self.open, self.filename, "wbt") 830 self.open, self.filename, "wbt")
831 self.assertRaises(ValueError,
832 self.open, self.filename, "xbt")
747 self.assertRaises(ValueError, 833 self.assertRaises(ValueError,
748 self.open, self.filename, "rb", encoding="utf-8") 834 self.open, self.filename, "rb", encoding="utf-8")
749 self.assertRaises(ValueError, 835 self.assertRaises(ValueError,
750 self.open, self.filename, "rb", errors="ignore") 836 self.open, self.filename, "rb", errors="ignore")
751 self.assertRaises(ValueError, 837 self.assertRaises(ValueError,
752 self.open, self.filename, "rb", newline="\n") 838 self.open, self.filename, "rb", newline="\n")
753 839
754 def test_encoding(self): 840 def test_encoding(self):
755 # Test non-default encoding. 841 # Test non-default encoding.
756 text = self.TEXT.decode("ascii") 842 text = self.TEXT.decode("ascii")
(...skipping 28 matching lines...) Expand all
785 BZ2FileTest, 871 BZ2FileTest,
786 BZ2CompressorTest, 872 BZ2CompressorTest,
787 BZ2DecompressorTest, 873 BZ2DecompressorTest,
788 CompressDecompressTest, 874 CompressDecompressTest,
789 OpenTest, 875 OpenTest,
790 ) 876 )
791 support.reap_children() 877 support.reap_children()
792 878
793 if __name__ == '__main__': 879 if __name__ == '__main__':
794 test_main() 880 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+