| LEFT | RIGHT |
| 1 """Unittests for the various HTTPServer modules. | 1 """Unittests for the various HTTPServer modules. |
| 2 | 2 |
| 3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, | 3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, |
| 4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. | 4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. |
| 5 """ | 5 """ |
| 6 | 6 |
| 7 from http.server import BaseHTTPRequestHandler, HTTPServer, \ | 7 from http.server import BaseHTTPRequestHandler, HTTPServer, \ |
| 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler | 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler |
| 9 from http import server | 9 from http import server |
| 10 | 10 |
| (...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 self.check_status_and_reason(response, 200) | 252 self.check_status_and_reason(response, 200) |
| 253 response = self.request(self.tempdir_name) | 253 response = self.request(self.tempdir_name) |
| 254 self.check_status_and_reason(response, 301) | 254 self.check_status_and_reason(response, 301) |
| 255 response = self.request('/ThisDoesNotExist') | 255 response = self.request('/ThisDoesNotExist') |
| 256 self.check_status_and_reason(response, 404) | 256 self.check_status_and_reason(response, 404) |
| 257 response = self.request('/' + 'ThisDoesNotExist' + '/') | 257 response = self.request('/' + 'ThisDoesNotExist' + '/') |
| 258 self.check_status_and_reason(response, 404) | 258 self.check_status_and_reason(response, 404) |
| 259 with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as f: | 259 with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as f: |
| 260 response = self.request('/' + self.tempdir_name + '/') | 260 response = self.request('/' + self.tempdir_name + '/') |
| 261 self.check_status_and_reason(response, 200) | 261 self.check_status_and_reason(response, 200) |
| 262 if os.name == 'posix': | 262 # chmod() doesn't work as expected on Windows, and filesystem |
| 263 # chmod won't work as expected on Windows platforms | 263 # permissions are ignored by root on Unix. |
| 264 if os.name == 'posix' and os.geteuid() != 0: |
| 264 os.chmod(self.tempdir, 0) | 265 os.chmod(self.tempdir, 0) |
| 265 response = self.request(self.tempdir_name + '/') | 266 response = self.request(self.tempdir_name + '/') |
| 266 self.check_status_and_reason(response, 404) | 267 self.check_status_and_reason(response, 404) |
| 267 os.chmod(self.tempdir, 0o755) | 268 os.chmod(self.tempdir, 0o755) |
| 268 | 269 |
| 269 def test_head(self): | 270 def test_head(self): |
| 270 response = self.request( | 271 response = self.request( |
| 271 self.tempdir_name + '/test', method='HEAD') | 272 self.tempdir_name + '/test', method='HEAD') |
| 272 self.check_status_and_reason(response, 200) | 273 self.check_status_and_reason(response, 200) |
| 273 self.assertEqual(response.getheader('content-length'), | 274 self.assertEqual(response.getheader('content-length'), |
| (...skipping 24 matching lines...) Expand all Loading... |
| 298 import cgi | 299 import cgi |
| 299 | 300 |
| 300 print("Content-type: text/html") | 301 print("Content-type: text/html") |
| 301 print() | 302 print() |
| 302 | 303 |
| 303 form = cgi.FieldStorage() | 304 form = cgi.FieldStorage() |
| 304 print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), | 305 print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), |
| 305 form.getfirst("bacon"))) | 306 form.getfirst("bacon"))) |
| 306 """ | 307 """ |
| 307 | 308 |
| 309 |
| 310 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, |
| 311 "This test can't be run reliably as root (issue #13308).") |
| 308 class CGIHTTPServerTestCase(BaseTestCase): | 312 class CGIHTTPServerTestCase(BaseTestCase): |
| 309 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): | 313 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): |
| 310 pass | 314 pass |
| 311 | 315 |
| 312 def setUp(self): | 316 def setUp(self): |
| 313 BaseTestCase.setUp(self) | 317 BaseTestCase.setUp(self) |
| 314 self.cwd = os.getcwd() | 318 self.cwd = os.getcwd() |
| 315 self.parent_dir = tempfile.mkdtemp() | 319 self.parent_dir = tempfile.mkdtemp() |
| 316 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') | 320 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') |
| 317 os.mkdir(self.cgi_dir) | 321 os.mkdir(self.cgi_dir) |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 363 # os.remove(self.pythonexe) | 367 # os.remove(self.pythonexe) |
| 364 if self.file1_path: | 368 if self.file1_path: |
| 365 os.remove(self.file1_path) | 369 os.remove(self.file1_path) |
| 366 if self.file2_path: | 370 if self.file2_path: |
| 367 os.remove(self.file2_path) | 371 os.remove(self.file2_path) |
| 368 os.rmdir(self.cgi_dir) | 372 os.rmdir(self.cgi_dir) |
| 369 os.rmdir(self.parent_dir) | 373 os.rmdir(self.parent_dir) |
| 370 finally: | 374 finally: |
| 371 BaseTestCase.tearDown(self) | 375 BaseTestCase.tearDown(self) |
| 372 | 376 |
| 373 def test_url_collapse_path_split(self): | 377 def test_url_collapse_path(self): |
| 378 # verify tail is the last portion and head is the rest on proper urls |
| 374 test_vectors = { | 379 test_vectors = { |
| 375 '': ('/', ''), | 380 '': '//', |
| 376 '..': IndexError, | 381 '..': IndexError, |
| 377 '/.//..': IndexError, | 382 '/.//..': IndexError, |
| 378 '/': ('/', ''), | 383 '/': '//', |
| 379 '//': ('/', ''), | 384 '//': '//', |
| 380 '/\\': ('/', '\\'), | 385 '/\\': '//\\', |
| 381 '/.//': ('/', ''), | 386 '/.//': '//', |
| 382 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), | 387 'cgi-bin/file1.py': '/cgi-bin/file1.py', |
| 383 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), | 388 '/cgi-bin/file1.py': '/cgi-bin/file1.py', |
| 384 'a': ('/', 'a'), | 389 'a': '//a', |
| 385 '/a': ('/', 'a'), | 390 '/a': '//a', |
| 386 '//a': ('/', 'a'), | 391 '//a': '//a', |
| 387 './a': ('/', 'a'), | 392 './a': '//a', |
| 388 './C:/': ('/C:', ''), | 393 './C:/': '/C:/', |
| 389 '/a/b': ('/a', 'b'), | 394 '/a/b': '/a/b', |
| 390 '/a/b/': ('/a/b', ''), | 395 '/a/b/': '/a/b/', |
| 391 '/a/b/c/..': ('/a/b', ''), | 396 '/a/b/.': '/a/b/', |
| 392 '/a/b/c/../d': ('/a/b', 'd'), | 397 '/a/b/c/..': '/a/b/', |
| 393 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'), | 398 '/a/b/c/../d': '/a/b/d', |
| 394 '/a/b/c/../d/e/../../f': ('/a/b', 'f'), | 399 '/a/b/c/../d/e/../f': '/a/b/d/f', |
| 395 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'), | 400 '/a/b/c/../d/e/../../f': '/a/b/f', |
| 401 '/a/b/c/../d/e/.././././..//f': '/a/b/f', |
| 396 '../a/b/c/../d/e/.././././..//f': IndexError, | 402 '../a/b/c/../d/e/.././././..//f': IndexError, |
| 397 '/a/b/c/../d/e/../../../f': ('/a', 'f'), | 403 '/a/b/c/../d/e/../../../f': '/a/f', |
| 398 '/a/b/c/../d/e/../../../../f': ('/', 'f'), | 404 '/a/b/c/../d/e/../../../../f': '//f', |
| 399 '/a/b/c/../d/e/../../../../../f': IndexError, | 405 '/a/b/c/../d/e/../../../../../f': IndexError, |
| 400 '/a/b/c/../d/e/../../../../f/..': ('/', ''), | 406 '/a/b/c/../d/e/../../../../f/..': '//', |
| 407 '/a/b/c/../d/e/../../../../f/../.': '//', |
| 401 } | 408 } |
| 402 for path, expected in test_vectors.items(): | 409 for path, expected in test_vectors.items(): |
| 403 if isinstance(expected, type) and issubclass(expected, Exception): | 410 if isinstance(expected, type) and issubclass(expected, Exception): |
| 404 self.assertRaises(expected, | 411 self.assertRaises(expected, |
| 405 server._url_collapse_path_split, path) | 412 server._url_collapse_path, path) |
| 406 else: | 413 else: |
| 407 actual = server._url_collapse_path_split(path) | 414 actual = server._url_collapse_path(path) |
| 408 self.assertEqual(expected, actual, | 415 self.assertEqual(expected, actual, |
| 409 msg='path = %r\nGot: %r\nWanted: %r' % | 416 msg='path = %r\nGot: %r\nWanted: %r' % |
| 410 (path, actual, expected)) | 417 (path, actual, expected)) |
| 411 | 418 |
| 412 def test_headers_and_content(self): | 419 def test_headers_and_content(self): |
| 413 res = self.request('/cgi-bin/file1.py') | 420 res = self.request('/cgi-bin/file1.py') |
| 414 self.assertEqual((b'Hello World\n', 'text/html', 200), | 421 self.assertEqual((b'Hello World\n', 'text/html', 200), |
| 415 (res.read(), res.getheader('Content-type'), res.status)) | 422 (res.read(), res.getheader('Content-type'), res.status)) |
| 416 | 423 |
| 417 def test_post(self): | 424 def test_post(self): |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 461 self.end_headers() | 468 self.end_headers() |
| 462 self.wfile.write(b'<html><body>Data</body></html>\r\n') | 469 self.wfile.write(b'<html><body>Data</body></html>\r\n') |
| 463 | 470 |
| 464 def log_message(self, format, *args): | 471 def log_message(self, format, *args): |
| 465 pass | 472 pass |
| 466 | 473 |
| 467 class RejectingSocketlessRequestHandler(SocketlessRequestHandler): | 474 class RejectingSocketlessRequestHandler(SocketlessRequestHandler): |
| 468 def handle_expect_100(self): | 475 def handle_expect_100(self): |
| 469 self.send_error(417) | 476 self.send_error(417) |
| 470 return False | 477 return False |
| 478 |
| 479 |
| 480 class AuditableBytesIO: |
| 481 |
| 482 def __init__(self): |
| 483 self.datas = [] |
| 484 |
| 485 def write(self, data): |
| 486 self.datas.append(data) |
| 487 |
| 488 def getData(self): |
| 489 return b''.join(self.datas) |
| 490 |
| 491 @property |
| 492 def numWrites(self): |
| 493 return len(self.datas) |
| 494 |
| 471 | 495 |
| 472 class BaseHTTPRequestHandlerTestCase(unittest.TestCase): | 496 class BaseHTTPRequestHandlerTestCase(unittest.TestCase): |
| 473 """Test the functionality of the BaseHTTPServer. | 497 """Test the functionality of the BaseHTTPServer. |
| 474 | 498 |
| 475 Test the support for the Expect 100-continue header. | 499 Test the support for the Expect 100-continue header. |
| 476 """ | 500 """ |
| 477 | 501 |
| 478 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') | 502 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') |
| 479 | 503 |
| 480 def setUp (self): | 504 def setUp (self): |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 528 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') | 552 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') |
| 529 | 553 |
| 530 def test_with_continue_1_1(self): | 554 def test_with_continue_1_1(self): |
| 531 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-conti
nue\r\n\r\n') | 555 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-conti
nue\r\n\r\n') |
| 532 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') | 556 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') |
| 533 self.assertEqual(result[1], b'HTTP/1.1 200 OK\r\n') | 557 self.assertEqual(result[1], b'HTTP/1.1 200 OK\r\n') |
| 534 self.verify_expected_headers(result[2:-1]) | 558 self.verify_expected_headers(result[2:-1]) |
| 535 self.verify_get_called() | 559 self.verify_get_called() |
| 536 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') | 560 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') |
| 537 | 561 |
| 538 def test_header_buffering(self): | 562 def test_header_buffering_of_send_error(self): |
| 539 | |
| 540 def _readAndReseek(f): | |
| 541 pos = f.tell() | |
| 542 f.seek(0) | |
| 543 data = f.read() | |
| 544 f.seek(pos) | |
| 545 return data | |
| 546 | 563 |
| 547 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') | 564 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') |
| 548 output = BytesIO() | 565 output = AuditableBytesIO() |
| 549 self.handler.rfile = input | 566 handler = SocketlessRequestHandler() |
| 550 self.handler.wfile = output | 567 handler.rfile = input |
| 551 self.handler.request_version = 'HTTP/1.1' | 568 handler.wfile = output |
| 552 | 569 handler.request_version = 'HTTP/1.1' |
| 553 self.handler.send_header('Foo', 'foo') | 570 handler.requestline = '' |
| 554 self.handler.send_header('bar', 'bar') | 571 handler.command = None |
| 555 self.assertEqual(_readAndReseek(output), b'') | 572 |
| 556 self.handler.end_headers() | 573 handler.send_error(418) |
| 557 self.assertEqual(_readAndReseek(output), | 574 self.assertEqual(output.numWrites, 2) |
| 558 b'Foo: foo\r\nbar: bar\r\n\r\n') | 575 |
| 576 def test_header_buffering_of_send_response_only(self): |
| 577 |
| 578 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') |
| 579 output = AuditableBytesIO() |
| 580 handler = SocketlessRequestHandler() |
| 581 handler.rfile = input |
| 582 handler.wfile = output |
| 583 handler.request_version = 'HTTP/1.1' |
| 584 |
| 585 handler.send_response_only(418) |
| 586 self.assertEqual(output.numWrites, 0) |
| 587 handler.end_headers() |
| 588 self.assertEqual(output.numWrites, 1) |
| 589 |
| 590 def test_header_buffering_of_send_header(self): |
| 591 |
| 592 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') |
| 593 output = AuditableBytesIO() |
| 594 handler = SocketlessRequestHandler() |
| 595 handler.rfile = input |
| 596 handler.wfile = output |
| 597 handler.request_version = 'HTTP/1.1' |
| 598 |
| 599 handler.send_header('Foo', 'foo') |
| 600 handler.send_header('bar', 'bar') |
| 601 self.assertEqual(output.numWrites, 0) |
| 602 handler.end_headers() |
| 603 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') |
| 604 self.assertEqual(output.numWrites, 1) |
| 559 | 605 |
| 560 def test_header_unbuffered_when_continue(self): | 606 def test_header_unbuffered_when_continue(self): |
| 561 | 607 |
| 562 def _readAndReseek(f): | 608 def _readAndReseek(f): |
| 563 pos = f.tell() | 609 pos = f.tell() |
| 564 f.seek(0) | 610 f.seek(0) |
| 565 data = f.read() | 611 data = f.read() |
| 566 f.seek(pos) | 612 f.seek(pos) |
| 567 return data | 613 return data |
| 568 | 614 |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 634 BaseHTTPServerTestCase, | 680 BaseHTTPServerTestCase, |
| 635 SimpleHTTPServerTestCase, | 681 SimpleHTTPServerTestCase, |
| 636 CGIHTTPServerTestCase, | 682 CGIHTTPServerTestCase, |
| 637 SimpleHTTPRequestHandlerTestCase, | 683 SimpleHTTPRequestHandlerTestCase, |
| 638 ) | 684 ) |
| 639 finally: | 685 finally: |
| 640 os.chdir(cwd) | 686 os.chdir(cwd) |
| 641 | 687 |
| 642 if __name__ == '__main__': | 688 if __name__ == '__main__': |
| 643 test_main() | 689 test_main() |
| LEFT | RIGHT |