diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -3,7 +3,6 @@ import socket import datetime import textwrap import unittest -import functools import contextlib import os.path from test import support @@ -28,6 +27,102 @@ certfile = os.path.join(os.path.dirname( # - test auth and `usenetrc` +class TcpServer(NNTPv2Handler): + def __init__(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, + socket.IPPROTO_TCP) + self.port = support.bind_port(self.sock) + self.sock.listen() + + def run_server(self): + with self.sock: + [client, _] = self.sock.accept() + with contextlib.ExitStack() as self.cleanup: + client = self.cleanup.enter_context(client) + self.run_connection(client) + + def run_connection(self, client): + self.client = client + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self.start(self.reader.readline, self.client.sendall) + self.process_pending() + + def handle_CAPABILITIES(self): + self.push_lit( + '101 Capability list:\r\n' + 'VERSION 2\r\n' + 'AUTHINFO USER\r\n' + 'LIST ACTIVE NEWSGROUPS OVERVIEW.FMT\r\n' + 'OVER\r\n' + 'READER\r\n' + 'STARTTLS\r\n' + '.\r\n' + ) + + def handle_LIST(self, action='ACTIVE', param=None): + # Override to return non-ASCII group names + if action == 'ACTIVE': + result = { + None: 'test.ñon-äscii', + 'fr.comp.lang.*': 'fr.comp.lang.python', + 'comp.lang.*': 'comp.lang.python', + } + self.push_lit( + '215 Newsgroups as "name high low status"\r\n' + '{} 3002322 3000234 y\r\n' + '.\r\n'.format(result[param])) + elif action == 'NEWSGROUPS': + results = { + 'fr.comp.lang.python': + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.python': + 'comp.lang.python\tThe Python computer language.\r\n', + 'fr.comp.lang.*': + 'fr.comp.lang.c\t\t' + 'Langages de programmation C et assimilés.\r\n' + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.*': + 'comp.lang.c\t\tDiscussion about C.\r\n' + 'comp.lang.python\tThe Python computer language.\r\n', + } + results = results.get(param, '') + self.push_lit('215 Newsgroups as "name description"\r\n' + + results + '.\r\n') + else: + super().handle_LIST(action, param) + + def handle_NEWGROUPS(self, date, time): + self.push_lit('231 New newsgroups:\r\n' + 'test.ñon-äscii 3002322 3000234 y\r\n' + '.\r\n') + + def handle_STARTTLS(self): + self.reader.close() + self.push_lit('382 Begin TLS negotiation now\r\n') + self.client = ssl.wrap_socket(self.client, + server_side=True, certfile=certfile) + self.cleanup.enter_context(self.client) + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self._readline = self.reader.readline + self._push_data = self.client.sendall + + def handle_XHDR(self, header, arg): + assert header == 'subject' + assert arg == '1265' + self.push_lit( + '221 Header follows\r\n' + '1265 Dummy subject\r\n' + '.\r\n' + ) + +class SslServer(TcpServer): + def run_connection(self, client): + client = ssl.wrap_socket(client, server_side=True, certfile=certfile) + super().run_connection(client) + + class NetworkedNNTPTestsMixin: def test_welcome(self): @@ -42,15 +137,13 @@ class NetworkedNNTPTestsMixin: def test_list(self): resp, groups = self.server.list() - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) def test_list_active(self): resp, groups = self.server.list(self.GROUP_PAT) - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) def test_unknown_command(self): with self.assertRaises(nntplib.NNTPPermanentError) as cm: @@ -59,13 +152,10 @@ class NetworkedNNTPTestsMixin: self.assertTrue(resp.startswith("500 "), resp) def test_newgroups(self): - # gmane gets a constant influx of new groups. In order not to stress - # the server too much, we choose a recent date in the past. dt = datetime.date.today() - datetime.timedelta(days=7) resp, groups = self.server.newgroups(dt) - if len(groups) > 0: - self.assertIsInstance(groups[0], GroupInfo) - self.assertIsInstance(groups[0].group, str) + self.assertIsInstance(groups[0], GroupInfo) + self.assertIsInstance(groups[0].group, str) def test_description(self): def _check_desc(desc): @@ -85,9 +175,8 @@ class NetworkedNNTPTestsMixin: def test_descriptions(self): resp, descs = self.server.descriptions(self.GROUP_PAT) - # 215 for LIST NEWSGROUPS, 282 for XGTITLE - self.assertTrue( - resp.startswith("215 ") or resp.startswith("282 "), resp) + # 215 for LIST NEWSGROUPS + self.assertTrue(resp.startswith("215 ")) self.assertIsInstance(descs, dict) desc = descs[self.GROUP_NAME] self.assertEqual(desc, self.server.description(self.GROUP_NAME)) @@ -124,8 +213,6 @@ class NetworkedNNTPTestsMixin: def test_xover(self): resp, count, first, last, name = self.server.group(self.GROUP_NAME) resp, lines = self.server.xover(last - 5, last) - if len(lines) == 0: - self.skipTest("no articles retrieved") # The 'last' article is not necessarily part of the output (cancelled?) art_num, art_dict = lines[0] self.assertGreaterEqual(art_num, last - 5) @@ -167,17 +254,8 @@ class NetworkedNNTPTestsMixin: def test_article_head_body(self): resp, count, first, last, name = self.server.group(self.GROUP_NAME) # Try to find an available article - for art_num in (last, first, last - 1): - try: - resp, head = self.server.head(art_num) - except nntplib.NNTPTemporaryError as e: - if not e.response.startswith("423 "): - raise - # "423 No such article" => choose another one - continue - break - else: - self.skipTest("could not find a suitable article number") + art_num = last + resp, head = self.server.head(art_num) self.assertTrue(resp.startswith("221 "), resp) self.check_article_resp(resp, head, art_num) resp, body = self.server.body(art_num) @@ -208,18 +286,29 @@ class NetworkedNNTPTestsMixin: resp, caps = self.server.capabilities() _check_caps(caps) + @unittest.skipUnless(ssl, 'requires SSL support') + def test_starttls(self): + file = self.server.file + sock = self.server.sock + self.server.starttls() + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.server.file) + self.assertNotEqual(sock, self.server.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.server.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.server.starttls) + def test_zlogin(self): # This test must be the penultimate because further commands will be # refused. baduser = "notarealuser" badpw = "notarealpassword" # Check that bogus credentials cause failure - self.assertRaises(nntplib.NNTPError, self.server.login, + self.assertRaises(nntplib.NNTPTemporaryError, self.server.login, user=baduser, password=badpw, usenetrc=False) - # FIXME: We should check that correct credentials succeed, but that - # would require valid details for some server somewhere to be in the - # test suite, I think. Gmane is anonymous, at least as used for the - # other tests. + # FIXME: We should check that correct credentials succeed. def test_zzquit(self): # This test must be called last, hence the name @@ -229,26 +318,6 @@ class NetworkedNNTPTestsMixin: finally: cls.server = None - @classmethod - def wrap_methods(cls): - # Wrap all methods in a transient_internet() exception catcher - # XXX put a generic version in test.support? - def wrap_meth(meth): - @functools.wraps(meth) - def wrapped(self): - with support.transient_internet(self.NNTP_HOST): - meth(self) - return wrapped - for name in dir(cls): - if not name.startswith('test_'): - continue - meth = getattr(cls, name) - if not callable(meth): - continue - # Need to use a closure so that meth remains bound to its current - # value - setattr(cls, name, wrap_meth(meth)) - def test_with_statement(self): def is_connected(): if not hasattr(server, 'file'): @@ -259,37 +328,41 @@ class NetworkedNNTPTestsMixin: return False return True - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - self.assertTrue(is_connected()) - self.assertTrue(server.help()) - self.assertFalse(is_connected()) + with support.transient_internet(self.NNTP_HOST): + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + self.assertTrue(is_connected()) + self.assertTrue(server.help()) + self.assertFalse(is_connected()) - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - server.quit() - self.assertFalse(is_connected()) + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + server.quit() + self.assertFalse(is_connected()) -NetworkedNNTPTestsMixin.wrap_methods() - - +@unittest.skipUnless(threading, 'requires multithreading') class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): - # This server supports STARTTLS (gmane doesn't) NNTP_HOST = 'news.trigofacile.com' GROUP_NAME = 'fr.comp.lang.python' GROUP_PAT = 'fr.comp.lang.*' NNTP_CLASS = NNTP + SERVER_CLASS = TcpServer @classmethod def setUpClass(cls): - support.requires("network") - with support.transient_internet(cls.NNTP_HOST): - cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) + server = cls.SERVER_CLASS() + cls.background = threading.Thread(target=server.run_server) + cls.background.start() + cls.server = cls.NNTP_CLASS(support.HOST, server.port, + timeout=TIMEOUT, usenetrc=False) @classmethod def tearDownClass(cls): if cls.server is not None: cls.server.quit() + cls.background.join() @unittest.skipUnless(ssl, 'requires SSL support') class NetworkedNNTP_SSLTests(NetworkedNNTPTests): @@ -303,16 +376,14 @@ class NetworkedNNTP_SSLTests(NetworkedNN GROUP_PAT = 'comp.lang.*' NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) - - # Disabled as it produces too much data - test_list = None + SERVER_CLASS = SslServer # Disabled as the connection will already be encrypted. test_starttls = None # -# Non-networked tests using a local server (or something mocking it). +# Non-networked tests using something mocking a local server. # class _NNTPServerIO(io.RawIOBase): @@ -392,7 +463,7 @@ class MockedNNTPWithReaderModeMixin(Mock self.make_server(readermode=True) -class NNTPv1Handler: +class NNTPv1Handler: # Moved in real patch """A handler for RFC 977""" welcome = "200 NNTP mock server" @@ -434,7 +505,6 @@ class NNTPv1Handler: raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) line = line[:-2] cmd, *tokens = line.split() - #meth = getattr(self.handler, "handle_" + cmd.upper(), None) meth = getattr(self, "handle_" + cmd.upper(), None) if meth is None: self.handle_unknown() @@ -476,8 +546,8 @@ class NNTPv1Handler: self.push_lit("111 20100914001155") def handle_GROUP(self, group): - if group == "fr.comp.lang.python": - self.push_lit("211 486 761 1265 fr.comp.lang.python") + if group in {"fr.comp.lang.python", "comp.lang.python"}: + self.push_lit("211 486 761 1265 {}".format(group)) else: self.push_lit("411 No such group {}".format(group)) @@ -618,6 +688,14 @@ class NNTPv1Handler: "\tXref: saria.nerim.net fr.comp.lang.python:1265" "\n" ".\n") + elif message_spec in {"1255-", "1255-1265", "1260-1265"}: + self.push_data( + b"224 Overview information follows\r\n" + b"1265\t\xD1on-UTF-8 s\xFCbject\t" + b'"Demo User" \t' + b"Sun, 19 Jun 2016 00:00:00 GMT\t" + b"<45223423@example.com>\t<45454@example.net>\t64\t3\r\n" + b".\r\n") else: self.push_lit("""\ 224 No articles @@ -662,6 +740,13 @@ class NNTPv1Handler: sample_article = sample_head + "\n\n" + sample_body + nonutf8_head = b"Message-ID: <45223423@example.com>\r\n" + nonutf8_body = ( + b"Archive-Name: fr/chartes/comp.lang.python\r\n" + b"\r\n" + b"Testing n\xF6n-UTF-8\r\n" + ) + def handle_ARTICLE(self, message_spec=None): if message_spec is None: self.push_lit("220 3000237 <45223423@example.com>") @@ -669,6 +754,13 @@ class NNTPv1Handler: self.push_lit("220 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("220 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"220 1265 <45223423@example.com> article\r\n" + + self.nonutf8_head + + b"\r\n" + + self.nonutf8_body + + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -682,6 +774,10 @@ class NNTPv1Handler: self.push_lit("221 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("221 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"221 1265 <45223423@example.com> head\r\n" + + self.nonutf8_head + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -695,6 +791,10 @@ class NNTPv1Handler: self.push_lit("222 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("222 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"222 1265 <45223423@example.com> body\r\n" + + self.nonutf8_body + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -711,13 +811,16 @@ class NNTPv1Handler: self.push_lit('381 Password Required') self._user_sent = True elif cred_type == 'pass': + if data != 'testpw': + self.push_lit('481 Authentication failed') + return self.push_lit('281 Login Successful') self._logged_in = True else: raise Exception('Unknown cred type {}'.format(cred_type)) -class NNTPv2Handler(NNTPv1Handler): +class NNTPv2Handler(NNTPv1Handler): # Moved in real patch """A handler for RFC 3977 (NNTP "v2")""" def handle_CAPABILITIES(self): @@ -1508,64 +1611,6 @@ class MockSslTests(MockSocketTests): def nntp_class(*pos, **kw): return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) -@unittest.skipUnless(threading, 'requires multithreading') -class LocalServerTests(unittest.TestCase): - def setUp(self): - sock = socket.socket() - port = support.bind_port(sock) - sock.listen() - self.background = threading.Thread( - target=self.run_server, args=(sock,)) - self.background.start() - self.addCleanup(self.background.join) - - self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() - self.addCleanup(self.nntp.__exit__, None, None, None) - - def run_server(self, sock): - # Could be generalized to handle more commands in separate methods - with sock: - [client, _] = sock.accept() - with contextlib.ExitStack() as cleanup: - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - client.sendall(b'200 Server ready\r\n') - while True: - cmd = reader.readline() - if cmd == b'CAPABILITIES\r\n': - client.sendall( - b'101 Capability list:\r\n' - b'VERSION 2\r\n' - b'STARTTLS\r\n' - b'.\r\n' - ) - elif cmd == b'STARTTLS\r\n': - reader.close() - client.sendall(b'382 Begin TLS negotiation now\r\n') - client = ssl.wrap_socket( - client, server_side=True, certfile=certfile) - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - elif cmd == b'QUIT\r\n': - client.sendall(b'205 Bye!\r\n') - break - else: - raise ValueError('Unexpected command {!r}'.format(cmd)) - - @unittest.skipUnless(ssl, 'requires SSL support') - def test_starttls(self): - file = self.nntp.file - sock = self.nntp.sock - self.nntp.starttls() - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.nntp.file) - self.assertNotEqual(sock, self.nntp.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.nntp.starttls) - if __name__ == "__main__": unittest.main()