diff -r ec3077e23b7d Lib/nntplib.py --- a/Lib/nntplib.py Sun Oct 05 21:20:51 2014 -0400 +++ b/Lib/nntplib.py Tue Mar 17 02:41:11 2015 +0000 @@ -1042,11 +1042,18 @@ self.host = host self.port = port self.sock = socket.create_connection((host, port), timeout) - file = self.sock.makefile("rwb") - _NNTPBase.__init__(self, file, host, - readermode, timeout) - if user or usenetrc: - self.login(user, password, usenetrc) + file = None + try: + file = self.sock.makefile("rwb") + _NNTPBase.__init__(self, file, host, + readermode, timeout) + if user or usenetrc: + self.login(user, password, usenetrc) + except: + if file: + file.close() + self.sock.close() + raise def _close(self): try: @@ -1066,12 +1073,19 @@ in default port and the `ssl_context` argument for SSL connections. """ self.sock = socket.create_connection((host, port), timeout) - self.sock = _encrypt_on(self.sock, ssl_context, host) - file = self.sock.makefile("rwb") - _NNTPBase.__init__(self, file, host, - readermode=readermode, timeout=timeout) - if user or usenetrc: - self.login(user, password, usenetrc) + file = None + try: + self.sock = _encrypt_on(self.sock, ssl_context, host) + file = self.sock.makefile("rwb") + _NNTPBase.__init__(self, file, host, + readermode=readermode, timeout=timeout) + if user or usenetrc: + self.login(user, password, usenetrc) + except: + if file: + file.close() + self.sock.close() + raise def _close(self): try: diff -r ec3077e23b7d Lib/test/test_nntplib.py --- a/Lib/test/test_nntplib.py Sun Oct 05 21:20:51 2014 -0400 +++ b/Lib/test/test_nntplib.py Tue Mar 17 02:41:11 2015 +0000 @@ -8,6 +8,7 @@ from test import support from nntplib import NNTP, GroupInfo import nntplib +from unittest.mock import patch try: import ssl except ImportError: @@ -370,6 +371,14 @@ return n +def make_mock_file(handler): + sio = _NNTPServerIO(handler) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(sio, sio) + return (sio, file) + + class MockedNNTPTestsMixin: # Override in derived classes handler_class = None @@ -384,10 +393,7 @@ def make_server(self, *args, **kwargs): self.handler = self.handler_class() - self.sio = _NNTPServerIO(self.handler) - # Using BufferedRWPair instead of BufferedRandom ensures the file - # isn't seekable. - file = io.BufferedRWPair(self.sio, self.sio) + self.sio, file = make_mock_file(self.handler) self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) return self.server @@ -1425,5 +1431,94 @@ target_api.append('NNTP_SSL') self.assertEqual(set(nntplib.__all__), set(target_api)) +class MockSocketTests(unittest.TestCase): + """Tests involving a mock socket object + + Used where the _NNTPServerIO file object is not enough.""" + + nntp_class = nntplib.NNTP + + def check_constructor_error_conditions( + self, handler_class, + expected_nntp_error, expected_nntp_error_repr, + login=None, password=None): + + class mock_socket_module: + def create_connection(address, timeout): + return MockSocket() + + class MockSocket: + def close(self): + nonlocal socket_closed + socket_closed = True + + def makefile(socket, mode): + handler = handler_class() + _, file = make_mock_file(handler) + files.append(file) + return file + + socket_closed = False + files = [] + with patch('nntplib.socket', mock_socket_module), \ + self.assertRaisesRegex(expected_nntp_error, + expected_nntp_error_repr): + self.nntp_class('dummy', user=login, password=password) + self.assertTrue(socket_closed) + for f in files: + self.assertTrue(f.closed) + + def test_bad_welcome(self): + #Test a bad welcome message + class Handler(NNTPv1Handler): + welcome = 'Bad Welcome' + self.check_constructor_error_conditions( + Handler, nntplib.NNTPProtocolError, Handler.welcome) + + def test_service_temporarily_unavailable(self): + #Test service temporarily unavailable + class Handler(NNTPv1Handler): + welcome = '400 Service temporarily unavilable' + self.check_constructor_error_conditions( + Handler, nntplib.NNTPTemporaryError, Handler.welcome) + + def test_service_permanently_unavailable(self): + #Test service permanently unavailable + class Handler(NNTPv1Handler): + welcome = '502 Service permanently unavilable' + self.check_constructor_error_conditions( + Handler, nntplib.NNTPPermanentError, Handler.welcome) + + def test_bad_capabilities(self): + #Test a bad capabilities response + class Handler(NNTPv1Handler): + def handle_CAPABILITIES(self): + self.push_lit(capabilities_response) + capabilities_response = '201 bad capability' + self.check_constructor_error_conditions( + Handler, nntplib.NNTPReplyError, capabilities_response) + + def test_login_aborted(self): + #Test a bad authinfo response + login = 't@e.com' + password = 'python' + class Handler(NNTPv1Handler): + def handle_AUTHINFO(self, *args): + self.push_lit(authinfo_response) + authinfo_response = '503 Mechanism not recognized' + self.check_constructor_error_conditions( + Handler, nntplib.NNTPPermanentError, authinfo_response, + login, password) + +@unittest.skipUnless(ssl, 'requires SSL support') +class MockSslTests(MockSocketTests): + class nntp_class(nntplib.NNTP_SSL): + def __init__(self, *pos, **kw): + class bypass_context: + """Bypass encryption and actual SSL module""" + def wrap_socket(sock, **args): + return sock + return super().__init__(*pos, ssl_context=bypass_context, **kw) + if __name__ == "__main__": unittest.main()