LEFT | RIGHT |
1 #! /usr/bin/env python3 | |
2 """Test script for the gzip module. | 1 """Test script for the gzip module. |
3 """ | 2 """ |
4 | 3 |
5 import unittest | 4 import unittest |
6 from test import support | 5 from test import support |
7 import os | 6 import os |
8 import io | 7 import io |
9 import struct | 8 import struct |
10 gzip = support.import_module('gzip') | 9 gzip = support.import_module('gzip') |
11 | 10 |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
123 f.write(b'a') | 122 f.write(b'a') |
124 | 123 |
125 # Try reading the file | 124 # Try reading the file |
126 with gzip.GzipFile(self.filename, "rb") as zgfile: | 125 with gzip.GzipFile(self.filename, "rb") as zgfile: |
127 contents = b"" | 126 contents = b"" |
128 while 1: | 127 while 1: |
129 ztxt = zgfile.read(8192) | 128 ztxt = zgfile.read(8192) |
130 contents += ztxt | 129 contents += ztxt |
131 if not ztxt: break | 130 if not ztxt: break |
132 self.assertEqual(contents, b'a'*201) | 131 self.assertEqual(contents, b'a'*201) |
| 132 |
| 133 def test_exclusive_write(self): |
| 134 with gzip.GzipFile(self.filename, 'xb') as f: |
| 135 f.write(data1 * 50) |
| 136 with gzip.GzipFile(self.filename, 'rb') as f: |
| 137 self.assertEqual(f.read(), data1 * 50) |
| 138 with self.assertRaises(FileExistsError): |
| 139 gzip.GzipFile(self.filename, 'xb') |
133 | 140 |
134 def test_buffered_reader(self): | 141 def test_buffered_reader(self): |
135 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for | 142 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for |
136 # performance. | 143 # performance. |
137 self.test_write() | 144 self.test_write() |
138 | 145 |
139 with gzip.GzipFile(self.filename, 'rb') as f: | 146 with gzip.GzipFile(self.filename, 'rb') as f: |
140 with io.BufferedReader(f) as r: | 147 with io.BufferedReader(f) as r: |
141 lines = [line for line in r] | 148 lines = [line for line in r] |
142 | 149 |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
199 # Try seek, write test | 206 # Try seek, write test |
200 with gzip.GzipFile(self.filename, 'w') as f: | 207 with gzip.GzipFile(self.filename, 'w') as f: |
201 for pos in range(0, 256, 16): | 208 for pos in range(0, 256, 16): |
202 f.seek(pos) | 209 f.seek(pos) |
203 f.write(b'GZ\n') | 210 f.write(b'GZ\n') |
204 | 211 |
205 def test_mode(self): | 212 def test_mode(self): |
206 self.test_write() | 213 self.test_write() |
207 with gzip.GzipFile(self.filename, 'r') as f: | 214 with gzip.GzipFile(self.filename, 'r') as f: |
208 self.assertEqual(f.myfileobj.mode, 'rb') | 215 self.assertEqual(f.myfileobj.mode, 'rb') |
| 216 support.unlink(self.filename) |
| 217 with gzip.GzipFile(self.filename, 'x') as f: |
| 218 self.assertEqual(f.myfileobj.mode, 'xb') |
209 | 219 |
210 def test_1647484(self): | 220 def test_1647484(self): |
211 for mode in ('wb', 'rb'): | 221 for mode in ('wb', 'rb'): |
212 with gzip.GzipFile(self.filename, mode) as f: | 222 with gzip.GzipFile(self.filename, mode) as f: |
213 self.assertTrue(hasattr(f, "name")) | 223 self.assertTrue(hasattr(f, "name")) |
214 self.assertEqual(f.name, self.filename) | 224 self.assertEqual(f.name, self.filename) |
215 | 225 |
216 def test_paddedfile_getattr(self): | 226 def test_paddedfile_getattr(self): |
217 self.test_write() | 227 self.test_write() |
218 with gzip.GzipFile(self.filename, 'rb') as f: | 228 with gzip.GzipFile(self.filename, 'rb') as f: |
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
382 def test_decompress(self): | 392 def test_decompress(self): |
383 for data in (data1, data2): | 393 for data in (data1, data2): |
384 buf = io.BytesIO() | 394 buf = io.BytesIO() |
385 with gzip.GzipFile(fileobj=buf, mode="wb") as f: | 395 with gzip.GzipFile(fileobj=buf, mode="wb") as f: |
386 f.write(data) | 396 f.write(data) |
387 self.assertEqual(gzip.decompress(buf.getvalue()), data) | 397 self.assertEqual(gzip.decompress(buf.getvalue()), data) |
388 # Roundtrip with compress | 398 # Roundtrip with compress |
389 datac = gzip.compress(data) | 399 datac = gzip.compress(data) |
390 self.assertEqual(gzip.decompress(datac), data) | 400 self.assertEqual(gzip.decompress(datac), data) |
391 | 401 |
| 402 def test_read_truncated(self): |
| 403 data = data1*50 |
| 404 # Drop the CRC (4 bytes) and file size (4 bytes). |
| 405 truncated = gzip.compress(data)[:-8] |
| 406 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: |
| 407 self.assertRaises(EOFError, f.read) |
| 408 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: |
| 409 self.assertEqual(f.read(len(data)), data) |
| 410 self.assertRaises(EOFError, f.read, 1) |
| 411 # Incomplete 10-byte header. |
| 412 for i in range(2, 10): |
| 413 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f: |
| 414 self.assertRaises(EOFError, f.read, 1) |
| 415 |
| 416 def test_read_with_extra(self): |
| 417 # Gzip data with an extra field |
| 418 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff' |
| 419 b'\x05\x00Extra' |
| 420 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00') |
| 421 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: |
| 422 self.assertEqual(f.read(), b'Test') |
392 | 423 |
393 class TestOpen(BaseTest): | 424 class TestOpen(BaseTest): |
394 def test_binary_modes(self): | 425 def test_binary_modes(self): |
395 uncompressed = data1 * 50 | 426 uncompressed = data1 * 50 |
| 427 |
396 with gzip.open(self.filename, "wb") as f: | 428 with gzip.open(self.filename, "wb") as f: |
397 f.write(uncompressed) | 429 f.write(uncompressed) |
398 with open(self.filename, "rb") as f: | 430 with open(self.filename, "rb") as f: |
399 file_data = gzip.decompress(f.read()) | 431 file_data = gzip.decompress(f.read()) |
400 self.assertEqual(file_data, uncompressed) | 432 self.assertEqual(file_data, uncompressed) |
| 433 |
401 with gzip.open(self.filename, "rb") as f: | 434 with gzip.open(self.filename, "rb") as f: |
402 self.assertEqual(f.read(), uncompressed) | 435 self.assertEqual(f.read(), uncompressed) |
| 436 |
403 with gzip.open(self.filename, "ab") as f: | 437 with gzip.open(self.filename, "ab") as f: |
404 f.write(uncompressed) | 438 f.write(uncompressed) |
405 with open(self.filename, "rb") as f: | 439 with open(self.filename, "rb") as f: |
406 file_data = gzip.decompress(f.read()) | 440 file_data = gzip.decompress(f.read()) |
407 self.assertEqual(file_data, uncompressed * 2) | 441 self.assertEqual(file_data, uncompressed * 2) |
| 442 |
| 443 with self.assertRaises(FileExistsError): |
| 444 gzip.open(self.filename, "xb") |
| 445 support.unlink(self.filename) |
| 446 with gzip.open(self.filename, "xb") as f: |
| 447 f.write(uncompressed) |
| 448 with open(self.filename, "rb") as f: |
| 449 file_data = gzip.decompress(f.read()) |
| 450 self.assertEqual(file_data, uncompressed) |
408 | 451 |
409 def test_implicit_binary_modes(self): | 452 def test_implicit_binary_modes(self): |
410 # Test implicit binary modes (no "b" or "t" in mode string). | 453 # Test implicit binary modes (no "b" or "t" in mode string). |
411 uncompressed = data1 * 50 | 454 uncompressed = data1 * 50 |
| 455 |
412 with gzip.open(self.filename, "w") as f: | 456 with gzip.open(self.filename, "w") as f: |
413 f.write(uncompressed) | 457 f.write(uncompressed) |
414 with open(self.filename, "rb") as f: | 458 with open(self.filename, "rb") as f: |
415 file_data = gzip.decompress(f.read()) | 459 file_data = gzip.decompress(f.read()) |
416 self.assertEqual(file_data, uncompressed) | 460 self.assertEqual(file_data, uncompressed) |
| 461 |
417 with gzip.open(self.filename, "r") as f: | 462 with gzip.open(self.filename, "r") as f: |
418 self.assertEqual(f.read(), uncompressed) | 463 self.assertEqual(f.read(), uncompressed) |
| 464 |
419 with gzip.open(self.filename, "a") as f: | 465 with gzip.open(self.filename, "a") as f: |
420 f.write(uncompressed) | 466 f.write(uncompressed) |
421 with open(self.filename, "rb") as f: | 467 with open(self.filename, "rb") as f: |
422 file_data = gzip.decompress(f.read()) | 468 file_data = gzip.decompress(f.read()) |
423 self.assertEqual(file_data, uncompressed * 2) | 469 self.assertEqual(file_data, uncompressed * 2) |
| 470 |
| 471 with self.assertRaises(FileExistsError): |
| 472 gzip.open(self.filename, "x") |
| 473 support.unlink(self.filename) |
| 474 with gzip.open(self.filename, "x") as f: |
| 475 f.write(uncompressed) |
| 476 with open(self.filename, "rb") as f: |
| 477 file_data = gzip.decompress(f.read()) |
| 478 self.assertEqual(file_data, uncompressed) |
424 | 479 |
425 def test_text_modes(self): | 480 def test_text_modes(self): |
426 uncompressed = data1.decode("ascii") * 50 | 481 uncompressed = data1.decode("ascii") * 50 |
427 uncompressed_raw = uncompressed.replace("\n", os.linesep) | 482 uncompressed_raw = uncompressed.replace("\n", os.linesep) |
428 with gzip.open(self.filename, "wt") as f: | 483 with gzip.open(self.filename, "wt") as f: |
429 f.write(uncompressed) | 484 f.write(uncompressed) |
430 with open(self.filename, "rb") as f: | 485 with open(self.filename, "rb") as f: |
431 file_data = gzip.decompress(f.read()).decode("ascii") | 486 file_data = gzip.decompress(f.read()).decode("ascii") |
432 self.assertEqual(file_data, uncompressed_raw) | 487 self.assertEqual(file_data, uncompressed_raw) |
433 with gzip.open(self.filename, "rt") as f: | 488 with gzip.open(self.filename, "rt") as f: |
(...skipping 15 matching lines...) Expand all Loading... |
449 with gzip.open(io.BytesIO(compressed), "rt") as f: | 504 with gzip.open(io.BytesIO(compressed), "rt") as f: |
450 self.assertEqual(f.read(), uncompressed_str) | 505 self.assertEqual(f.read(), uncompressed_str) |
451 | 506 |
452 def test_bad_params(self): | 507 def test_bad_params(self): |
453 # Test invalid parameter combinations. | 508 # Test invalid parameter combinations. |
454 with self.assertRaises(TypeError): | 509 with self.assertRaises(TypeError): |
455 gzip.open(123.456) | 510 gzip.open(123.456) |
456 with self.assertRaises(ValueError): | 511 with self.assertRaises(ValueError): |
457 gzip.open(self.filename, "wbt") | 512 gzip.open(self.filename, "wbt") |
458 with self.assertRaises(ValueError): | 513 with self.assertRaises(ValueError): |
| 514 gzip.open(self.filename, "xbt") |
| 515 with self.assertRaises(ValueError): |
459 gzip.open(self.filename, "rb", encoding="utf-8") | 516 gzip.open(self.filename, "rb", encoding="utf-8") |
460 with self.assertRaises(ValueError): | 517 with self.assertRaises(ValueError): |
461 gzip.open(self.filename, "rb", errors="ignore") | 518 gzip.open(self.filename, "rb", errors="ignore") |
462 with self.assertRaises(ValueError): | 519 with self.assertRaises(ValueError): |
463 gzip.open(self.filename, "rb", newline="\n") | 520 gzip.open(self.filename, "rb", newline="\n") |
464 | 521 |
465 def test_encoding(self): | 522 def test_encoding(self): |
466 # Test non-default encoding. | 523 # Test non-default encoding. |
467 uncompressed = data1.decode("ascii") * 50 | 524 uncompressed = data1.decode("ascii") * 50 |
468 uncompressed_raw = uncompressed.replace("\n", os.linesep) | 525 uncompressed_raw = uncompressed.replace("\n", os.linesep) |
(...skipping 19 matching lines...) Expand all Loading... |
488 with gzip.open(self.filename, "wt", newline="\n") as f: | 545 with gzip.open(self.filename, "wt", newline="\n") as f: |
489 f.write(uncompressed) | 546 f.write(uncompressed) |
490 with gzip.open(self.filename, "rt", newline="\r") as f: | 547 with gzip.open(self.filename, "rt", newline="\r") as f: |
491 self.assertEqual(f.readlines(), [uncompressed]) | 548 self.assertEqual(f.readlines(), [uncompressed]) |
492 | 549 |
493 def test_main(verbose=None): | 550 def test_main(verbose=None): |
494 support.run_unittest(TestGzip, TestOpen) | 551 support.run_unittest(TestGzip, TestOpen) |
495 | 552 |
496 if __name__ == "__main__": | 553 if __name__ == "__main__": |
497 test_main(verbose=True) | 554 test_main(verbose=True) |
LEFT | RIGHT |