# HG changeset patch # Parent e2b09c00ee24ecc3ddb55e520dd8f8a74b65a9cc Issue #19756: Change remote NNTP server to localhost server for most tests * Local server is based on the existing NNTPv2Handler class * Reintegrate test_starttls() into NetworkedNNTPTestsMixin * Only test_with_statement() talks to the remote NNTP_HOST now * Make tests in NetworkedNNTPTestsMixin stricter, because the server responses are predetermined now * Add mock comp.lang.python support to NNTPv1Handler.handle_GROUP() * Add support for rejecting passwords in NNTPv1Handler.handle_AUTHINFO() * New message number with non-UTF-8 message body for ARTICLE, HEAD, BODY, and non-UTF-8 subject for OVER, XOVER diff -r e2b09c00ee24 Lib/test/test_nntplib.py --- a/Lib/test/test_nntplib.py Sat Jun 18 15:58:52 2016 -0400 +++ b/Lib/test/test_nntplib.py Sun Jun 19 05:24:35 2016 +0000 @@ -3,7 +3,6 @@ import datetime import textwrap import unittest -import functools import contextlib import os.path from test import support @@ -28,370 +27,6 @@ # - test auth and `usenetrc` -class NetworkedNNTPTestsMixin: - - def test_welcome(self): - welcome = self.server.getwelcome() - self.assertEqual(str, type(welcome)) - - def test_help(self): - resp, lines = self.server.help() - self.assertTrue(resp.startswith("100 "), resp) - for line in lines: - self.assertEqual(str, type(line)) - - def test_list(self): - resp, groups = self.server.list() - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) - - def test_list_active(self): - resp, groups = self.server.list(self.GROUP_PAT) - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) - - def test_unknown_command(self): - with self.assertRaises(nntplib.NNTPPermanentError) as cm: - self.server._shortcmd("XYZZY") - resp = cm.exception.response - self.assertTrue(resp.startswith("500 "), resp) - - def test_newgroups(self): - # gmane gets a constant influx of new groups. In order not to stress - # the server too much, we choose a recent date in the past. - dt = datetime.date.today() - datetime.timedelta(days=7) - resp, groups = self.server.newgroups(dt) - if len(groups) > 0: - self.assertIsInstance(groups[0], GroupInfo) - self.assertIsInstance(groups[0].group, str) - - def test_description(self): - def _check_desc(desc): - # Sanity checks - self.assertIsInstance(desc, str) - self.assertNotIn(self.GROUP_NAME, desc) - desc = self.server.description(self.GROUP_NAME) - _check_desc(desc) - # Another sanity check - self.assertIn("Python", desc) - # With a pattern - desc = self.server.description(self.GROUP_PAT) - _check_desc(desc) - # Shouldn't exist - desc = self.server.description("zk.brrtt.baz") - self.assertEqual(desc, '') - - def test_descriptions(self): - resp, descs = self.server.descriptions(self.GROUP_PAT) - # 215 for LIST NEWSGROUPS, 282 for XGTITLE - self.assertTrue( - resp.startswith("215 ") or resp.startswith("282 "), resp) - self.assertIsInstance(descs, dict) - desc = descs[self.GROUP_NAME] - self.assertEqual(desc, self.server.description(self.GROUP_NAME)) - - def test_group(self): - result = self.server.group(self.GROUP_NAME) - self.assertEqual(5, len(result)) - resp, count, first, last, group = result - self.assertEqual(group, self.GROUP_NAME) - self.assertIsInstance(count, int) - self.assertIsInstance(first, int) - self.assertIsInstance(last, int) - self.assertLessEqual(first, last) - self.assertTrue(resp.startswith("211 "), resp) - - def test_date(self): - resp, date = self.server.date() - self.assertIsInstance(date, datetime.datetime) - # Sanity check - self.assertGreaterEqual(date.year, 1995) - self.assertLessEqual(date.year, 2030) - - def _check_art_dict(self, art_dict): - # Some sanity checks for a field dictionary returned by OVER / XOVER - self.assertIsInstance(art_dict, dict) - # NNTP has 7 mandatory fields - self.assertGreaterEqual(art_dict.keys(), - {"subject", "from", "date", "message-id", - "references", ":bytes", ":lines"} - ) - for v in art_dict.values(): - self.assertIsInstance(v, (str, type(None))) - - def test_xover(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - resp, lines = self.server.xover(last - 5, last) - if len(lines) == 0: - self.skipTest("no articles retrieved") - # The 'last' article is not necessarily part of the output (cancelled?) - art_num, art_dict = lines[0] - self.assertGreaterEqual(art_num, last - 5) - self.assertLessEqual(art_num, last) - self._check_art_dict(art_dict) - - def test_over(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - start = last - 10 - # The "start-" article range form - resp, lines = self.server.over((start, None)) - art_num, art_dict = lines[0] - self._check_art_dict(art_dict) - # The "start-end" article range form - resp, lines = self.server.over((start, last)) - art_num, art_dict = lines[-1] - # The 'last' article is not necessarily part of the output (cancelled?) - self.assertGreaterEqual(art_num, start) - self.assertLessEqual(art_num, last) - self._check_art_dict(art_dict) - # XXX The "message_id" form is unsupported by gmane - # 503 Overview by message-ID unsupported - - def test_xhdr(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - resp, lines = self.server.xhdr('subject', last) - for line in lines: - self.assertEqual(str, type(line[1])) - - def check_article_resp(self, resp, article, art_num=None): - self.assertIsInstance(article, nntplib.ArticleInfo) - if art_num is not None: - self.assertEqual(article.number, art_num) - for line in article.lines: - self.assertIsInstance(line, bytes) - # XXX this could exceptionally happen... - self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) - - def test_article_head_body(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - # Try to find an available article - for art_num in (last, first, last - 1): - try: - resp, head = self.server.head(art_num) - except nntplib.NNTPTemporaryError as e: - if not e.response.startswith("423 "): - raise - # "423 No such article" => choose another one - continue - break - else: - self.skipTest("could not find a suitable article number") - self.assertTrue(resp.startswith("221 "), resp) - self.check_article_resp(resp, head, art_num) - resp, body = self.server.body(art_num) - self.assertTrue(resp.startswith("222 "), resp) - self.check_article_resp(resp, body, art_num) - resp, article = self.server.article(art_num) - self.assertTrue(resp.startswith("220 "), resp) - self.check_article_resp(resp, article, art_num) - # Tolerate running the tests from behind a NNTP virus checker - blacklist = lambda line: line.startswith(b'X-Antivirus') - filtered_head_lines = [line for line in head.lines - if not blacklist(line)] - filtered_lines = [line for line in article.lines - if not blacklist(line)] - self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) - - def test_capabilities(self): - # The server under test implements NNTP version 2 and has a - # couple of well-known capabilities. Just sanity check that we - # got them. - def _check_caps(caps): - caps_list = caps['LIST'] - self.assertIsInstance(caps_list, (list, tuple)) - self.assertIn('OVERVIEW.FMT', caps_list) - self.assertGreaterEqual(self.server.nntp_version, 2) - _check_caps(self.server.getcapabilities()) - # This re-emits the command - resp, caps = self.server.capabilities() - _check_caps(caps) - - def test_zlogin(self): - # This test must be the penultimate because further commands will be - # refused. - baduser = "notarealuser" - badpw = "notarealpassword" - # Check that bogus credentials cause failure - self.assertRaises(nntplib.NNTPError, self.server.login, - user=baduser, password=badpw, usenetrc=False) - # FIXME: We should check that correct credentials succeed, but that - # would require valid details for some server somewhere to be in the - # test suite, I think. Gmane is anonymous, at least as used for the - # other tests. - - def test_zzquit(self): - # This test must be called last, hence the name - cls = type(self) - try: - self.server.quit() - finally: - cls.server = None - - @classmethod - def wrap_methods(cls): - # Wrap all methods in a transient_internet() exception catcher - # XXX put a generic version in test.support? - def wrap_meth(meth): - @functools.wraps(meth) - def wrapped(self): - with support.transient_internet(self.NNTP_HOST): - meth(self) - return wrapped - for name in dir(cls): - if not name.startswith('test_'): - continue - meth = getattr(cls, name) - if not callable(meth): - continue - # Need to use a closure so that meth remains bound to its current - # value - setattr(cls, name, wrap_meth(meth)) - - def test_with_statement(self): - def is_connected(): - if not hasattr(server, 'file'): - return False - try: - server.help() - except (OSError, EOFError): - return False - return True - - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - self.assertTrue(is_connected()) - self.assertTrue(server.help()) - self.assertFalse(is_connected()) - - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - server.quit() - self.assertFalse(is_connected()) - - -NetworkedNNTPTestsMixin.wrap_methods() - - -class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): - # This server supports STARTTLS (gmane doesn't) - NNTP_HOST = 'news.trigofacile.com' - GROUP_NAME = 'fr.comp.lang.python' - GROUP_PAT = 'fr.comp.lang.*' - - NNTP_CLASS = NNTP - - @classmethod - def setUpClass(cls): - support.requires("network") - with support.transient_internet(cls.NNTP_HOST): - cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) - - @classmethod - def tearDownClass(cls): - if cls.server is not None: - cls.server.quit() - -@unittest.skipUnless(ssl, 'requires SSL support') -class NetworkedNNTP_SSLTests(NetworkedNNTPTests): - - # Technical limits for this public NNTP server (see http://www.aioe.org): - # "Only two concurrent connections per IP address are allowed and - # 400 connections per day are accepted from each IP address." - - NNTP_HOST = 'nntp.aioe.org' - GROUP_NAME = 'comp.lang.python' - GROUP_PAT = 'comp.lang.*' - - NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) - - # Disabled as it produces too much data - test_list = None - - # Disabled as the connection will already be encrypted. - test_starttls = None - - -# -# Non-networked tests using a local server (or something mocking it). -# - -class _NNTPServerIO(io.RawIOBase): - """A raw IO object allowing NNTP commands to be received and processed - by a handler. The handler can push responses which can then be read - from the IO object.""" - - def __init__(self, handler): - io.RawIOBase.__init__(self) - # The channel from the client - self.c2s = io.BytesIO() - # The channel to the client - self.s2c = io.BytesIO() - self.handler = handler - self.handler.start(self.c2s.readline, self.push_data) - - def readable(self): - return True - - def writable(self): - return True - - def push_data(self, data): - """Push (buffer) some data to send to the client.""" - pos = self.s2c.tell() - self.s2c.seek(0, 2) - self.s2c.write(data) - self.s2c.seek(pos) - - def write(self, b): - """The client sends us some data""" - pos = self.c2s.tell() - self.c2s.write(b) - self.c2s.seek(pos) - self.handler.process_pending() - return len(b) - - def readinto(self, buf): - """The client wants to read a response""" - self.handler.process_pending() - b = self.s2c.read(len(buf)) - n = len(b) - buf[:n] = b - return n - - -def make_mock_file(handler): - sio = _NNTPServerIO(handler) - # Using BufferedRWPair instead of BufferedRandom ensures the file - # isn't seekable. - file = io.BufferedRWPair(sio, sio) - return (sio, file) - - -class MockedNNTPTestsMixin: - # Override in derived classes - handler_class = None - - def setUp(self): - super().setUp() - self.make_server() - - def tearDown(self): - super().tearDown() - del self.server - - def make_server(self, *args, **kwargs): - self.handler = self.handler_class() - self.sio, file = make_mock_file(self.handler) - self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) - return self.server - - -class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): - def setUp(self): - super().setUp() - self.make_server(readermode=True) - - class NNTPv1Handler: """A handler for RFC 977""" @@ -434,7 +69,6 @@ raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) line = line[:-2] cmd, *tokens = line.split() - #meth = getattr(self.handler, "handle_" + cmd.upper(), None) meth = getattr(self, "handle_" + cmd.upper(), None) if meth is None: self.handle_unknown() @@ -476,8 +110,8 @@ self.push_lit("111 20100914001155") def handle_GROUP(self, group): - if group == "fr.comp.lang.python": - self.push_lit("211 486 761 1265 fr.comp.lang.python") + if group in {"fr.comp.lang.python", "comp.lang.python"}: + self.push_lit("211 486 761 1265 {}".format(group)) else: self.push_lit("411 No such group {}".format(group)) @@ -618,6 +252,14 @@ "\tXref: saria.nerim.net fr.comp.lang.python:1265" "\n" ".\n") + elif message_spec in {"1255-", "1255-1265", "1260-1265"}: + self.push_data( + b"224 Overview information follows\r\n" + b"1265\t\xD1on-UTF-8 s\xFCbject\t" + b'"Demo User" \t' + b"Sun, 19 Jun 2016 00:00:00 GMT\t" + b"<45223423@example.com>\t<45454@example.net>\t64\t3\r\n" + b".\r\n") else: self.push_lit("""\ 224 No articles @@ -662,6 +304,13 @@ sample_article = sample_head + "\n\n" + sample_body + nonutf8_head = b"Message-ID: <45223423@example.com>\r\n" + nonutf8_body = ( + b"Archive-Name: fr/chartes/comp.lang.python\r\n" + b"\r\n" + b"Testing n\xF6n-UTF-8\r\n" + ) + def handle_ARTICLE(self, message_spec=None): if message_spec is None: self.push_lit("220 3000237 <45223423@example.com>") @@ -669,6 +318,13 @@ self.push_lit("220 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("220 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"220 1265 <45223423@example.com> article\r\n" + + self.nonutf8_head + + b"\r\n" + + self.nonutf8_body + + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -682,6 +338,10 @@ self.push_lit("221 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("221 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"221 1265 <45223423@example.com> head\r\n" + + self.nonutf8_head + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -695,6 +355,10 @@ self.push_lit("222 0 <45223423@example.com>") elif message_spec == "3000234": self.push_lit("222 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"222 1265 <45223423@example.com> body\r\n" + + self.nonutf8_body + b".\r\n") + return else: self.push_lit("430 No Such Article Found") return @@ -711,6 +375,9 @@ self.push_lit('381 Password Required') self._user_sent = True elif cred_type == 'pass': + if data != 'testpw': + self.push_lit('481 Authentication failed') + return self.push_lit('281 Login Successful') self._logged_in = True else: @@ -744,6 +411,442 @@ return self.handle_XOVER(message_spec) +class TcpServer(NNTPv2Handler): + def __init__(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, + socket.IPPROTO_TCP) + self.port = support.bind_port(self.sock) + self.sock.listen() + + def run_server(self): + with self.sock: + [client, _] = self.sock.accept() + with contextlib.ExitStack() as self.cleanup: + client = self.cleanup.enter_context(client) + self.run_connection(client) + + def run_connection(self, client): + self.client = client + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self.start(self.reader.readline, self.client.sendall) + self.process_pending() + + def handle_CAPABILITIES(self): + self.push_lit( + '101 Capability list:\r\n' + 'VERSION 2\r\n' + 'AUTHINFO USER\r\n' + 'LIST ACTIVE NEWSGROUPS OVERVIEW.FMT\r\n' + 'OVER\r\n' + 'READER\r\n' + 'STARTTLS\r\n' + '.\r\n' + ) + + def handle_LIST(self, action='ACTIVE', param=None): + # Override to return non-ASCII group names + if action == 'ACTIVE': + result = { + None: 'test.ñon-äscii', + 'fr.comp.lang.*': 'fr.comp.lang.python', + 'comp.lang.*': 'comp.lang.python', + } + self.push_lit( + '215 Newsgroups as "name high low status"\r\n' + '{} 3002322 3000234 y\r\n' + '.\r\n'.format(result[param])) + elif action == 'NEWSGROUPS': + results = { + 'fr.comp.lang.python': + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.python': + 'comp.lang.python\tThe Python computer language.\r\n', + 'fr.comp.lang.*': + 'fr.comp.lang.c\t\t' + 'Langages de programmation C et assimilés.\r\n' + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.*': + 'comp.lang.c\t\tDiscussion about C.\r\n' + 'comp.lang.python\tThe Python computer language.\r\n', + } + results = results.get(param, '') + self.push_lit('215 Newsgroups as "name description"\r\n' + + results + '.\r\n') + else: + super().handle_LIST(action, param) + + def handle_NEWGROUPS(self, date, time): + self.push_lit('231 New newsgroups:\r\n' + 'test.ñon-äscii 3002322 3000234 y\r\n' + '.\r\n') + + def handle_STARTTLS(self): + self.reader.close() + self.push_lit('382 Begin TLS negotiation now\r\n') + self.client = ssl.wrap_socket(self.client, + server_side=True, certfile=certfile) + self.cleanup.enter_context(self.client) + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self._readline = self.reader.readline + self._push_data = self.client.sendall + + def handle_XHDR(self, header, arg): + assert header == 'subject' + assert arg == '1265' + self.push_lit( + '221 Header follows\r\n' + '1265 Dummy subject\r\n' + '.\r\n' + ) + +class SslServer(TcpServer): + def run_connection(self, client): + client = ssl.wrap_socket(client, server_side=True, certfile=certfile) + super().run_connection(client) + + +class NetworkedNNTPTestsMixin: + + def test_welcome(self): + welcome = self.server.getwelcome() + self.assertEqual(str, type(welcome)) + + def test_help(self): + resp, lines = self.server.help() + self.assertTrue(resp.startswith("100 "), resp) + for line in lines: + self.assertEqual(str, type(line)) + + def test_list(self): + resp, groups = self.server.list() + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) + + def test_list_active(self): + resp, groups = self.server.list(self.GROUP_PAT) + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) + + def test_unknown_command(self): + with self.assertRaises(nntplib.NNTPPermanentError) as cm: + self.server._shortcmd("XYZZY") + resp = cm.exception.response + self.assertTrue(resp.startswith("500 "), resp) + + def test_newgroups(self): + dt = datetime.date.today() - datetime.timedelta(days=7) + resp, groups = self.server.newgroups(dt) + self.assertIsInstance(groups[0], GroupInfo) + self.assertIsInstance(groups[0].group, str) + + def test_description(self): + def _check_desc(desc): + # Sanity checks + self.assertIsInstance(desc, str) + self.assertNotIn(self.GROUP_NAME, desc) + desc = self.server.description(self.GROUP_NAME) + _check_desc(desc) + # Another sanity check + self.assertIn("Python", desc) + # With a pattern + desc = self.server.description(self.GROUP_PAT) + _check_desc(desc) + # Shouldn't exist + desc = self.server.description("zk.brrtt.baz") + self.assertEqual(desc, '') + + def test_descriptions(self): + resp, descs = self.server.descriptions(self.GROUP_PAT) + # 215 for LIST NEWSGROUPS + self.assertTrue(resp.startswith("215 ")) + self.assertIsInstance(descs, dict) + desc = descs[self.GROUP_NAME] + self.assertEqual(desc, self.server.description(self.GROUP_NAME)) + + def test_group(self): + result = self.server.group(self.GROUP_NAME) + self.assertEqual(5, len(result)) + resp, count, first, last, group = result + self.assertEqual(group, self.GROUP_NAME) + self.assertIsInstance(count, int) + self.assertIsInstance(first, int) + self.assertIsInstance(last, int) + self.assertLessEqual(first, last) + self.assertTrue(resp.startswith("211 "), resp) + + def test_date(self): + resp, date = self.server.date() + self.assertIsInstance(date, datetime.datetime) + # Sanity check + self.assertGreaterEqual(date.year, 1995) + self.assertLessEqual(date.year, 2030) + + def _check_art_dict(self, art_dict): + # Some sanity checks for a field dictionary returned by OVER / XOVER + self.assertIsInstance(art_dict, dict) + # NNTP has 7 mandatory fields + self.assertGreaterEqual(art_dict.keys(), + {"subject", "from", "date", "message-id", + "references", ":bytes", ":lines"} + ) + for v in art_dict.values(): + self.assertIsInstance(v, (str, type(None))) + + def test_xover(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xover(last - 5, last) + # The 'last' article is not necessarily part of the output (cancelled?) + art_num, art_dict = lines[0] + self.assertGreaterEqual(art_num, last - 5) + self.assertLessEqual(art_num, last) + self._check_art_dict(art_dict) + + def test_over(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + start = last - 10 + # The "start-" article range form + resp, lines = self.server.over((start, None)) + art_num, art_dict = lines[0] + self._check_art_dict(art_dict) + # The "start-end" article range form + resp, lines = self.server.over((start, last)) + art_num, art_dict = lines[-1] + # The 'last' article is not necessarily part of the output (cancelled?) + self.assertGreaterEqual(art_num, start) + self.assertLessEqual(art_num, last) + self._check_art_dict(art_dict) + # XXX The "message_id" form is unsupported by gmane + # 503 Overview by message-ID unsupported + + def test_xhdr(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xhdr('subject', last) + for line in lines: + self.assertEqual(str, type(line[1])) + + def check_article_resp(self, resp, article, art_num=None): + self.assertIsInstance(article, nntplib.ArticleInfo) + if art_num is not None: + self.assertEqual(article.number, art_num) + for line in article.lines: + self.assertIsInstance(line, bytes) + # XXX this could exceptionally happen... + self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) + + def test_article_head_body(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + # Try to find an available article + art_num = last + resp, head = self.server.head(art_num) + self.assertTrue(resp.startswith("221 "), resp) + self.check_article_resp(resp, head, art_num) + resp, body = self.server.body(art_num) + self.assertTrue(resp.startswith("222 "), resp) + self.check_article_resp(resp, body, art_num) + resp, article = self.server.article(art_num) + self.assertTrue(resp.startswith("220 "), resp) + self.check_article_resp(resp, article, art_num) + # Tolerate running the tests from behind a NNTP virus checker + blacklist = lambda line: line.startswith(b'X-Antivirus') + filtered_head_lines = [line for line in head.lines + if not blacklist(line)] + filtered_lines = [line for line in article.lines + if not blacklist(line)] + self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) + + def test_capabilities(self): + # The server under test implements NNTP version 2 and has a + # couple of well-known capabilities. Just sanity check that we + # got them. + def _check_caps(caps): + caps_list = caps['LIST'] + self.assertIsInstance(caps_list, (list, tuple)) + self.assertIn('OVERVIEW.FMT', caps_list) + self.assertGreaterEqual(self.server.nntp_version, 2) + _check_caps(self.server.getcapabilities()) + # This re-emits the command + resp, caps = self.server.capabilities() + _check_caps(caps) + + @unittest.skipUnless(ssl, 'requires SSL support') + def test_starttls(self): + file = self.server.file + sock = self.server.sock + self.server.starttls() + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.server.file) + self.assertNotEqual(sock, self.server.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.server.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.server.starttls) + + def test_zlogin(self): + # This test must be the penultimate because further commands will be + # refused. + baduser = "notarealuser" + badpw = "notarealpassword" + # Check that bogus credentials cause failure + self.assertRaises(nntplib.NNTPTemporaryError, self.server.login, + user=baduser, password=badpw, usenetrc=False) + # FIXME: We should check that correct credentials succeed. + + def test_zzquit(self): + # This test must be called last, hence the name + cls = type(self) + try: + self.server.quit() + finally: + cls.server = None + + def test_with_statement(self): + def is_connected(): + if not hasattr(server, 'file'): + return False + try: + server.help() + except (OSError, EOFError): + return False + return True + + with support.transient_internet(self.NNTP_HOST): + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + self.assertTrue(is_connected()) + self.assertTrue(server.help()) + self.assertFalse(is_connected()) + + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + server.quit() + self.assertFalse(is_connected()) + + +@unittest.skipUnless(threading, 'requires multithreading') +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): + NNTP_HOST = 'news.trigofacile.com' + GROUP_NAME = 'fr.comp.lang.python' + GROUP_PAT = 'fr.comp.lang.*' + + NNTP_CLASS = NNTP + SERVER_CLASS = TcpServer + + @classmethod + def setUpClass(cls): + server = cls.SERVER_CLASS() + cls.background = threading.Thread(target=server.run_server) + cls.background.start() + cls.server = cls.NNTP_CLASS(support.HOST, server.port, + timeout=TIMEOUT, usenetrc=False) + + @classmethod + def tearDownClass(cls): + if cls.server is not None: + cls.server.quit() + cls.background.join() + +@unittest.skipUnless(ssl, 'requires SSL support') +class NetworkedNNTP_SSLTests(NetworkedNNTPTests): + + # Technical limits for this public NNTP server (see http://www.aioe.org): + # "Only two concurrent connections per IP address are allowed and + # 400 connections per day are accepted from each IP address." + + NNTP_HOST = 'nntp.aioe.org' + GROUP_NAME = 'comp.lang.python' + GROUP_PAT = 'comp.lang.*' + + NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) + SERVER_CLASS = SslServer + + # Disabled as the connection will already be encrypted. + test_starttls = None + + +# +# Non-networked tests using something mocking a local server. +# + +class _NNTPServerIO(io.RawIOBase): + """A raw IO object allowing NNTP commands to be received and processed + by a handler. The handler can push responses which can then be read + from the IO object.""" + + def __init__(self, handler): + io.RawIOBase.__init__(self) + # The channel from the client + self.c2s = io.BytesIO() + # The channel to the client + self.s2c = io.BytesIO() + self.handler = handler + self.handler.start(self.c2s.readline, self.push_data) + + def readable(self): + return True + + def writable(self): + return True + + def push_data(self, data): + """Push (buffer) some data to send to the client.""" + pos = self.s2c.tell() + self.s2c.seek(0, 2) + self.s2c.write(data) + self.s2c.seek(pos) + + def write(self, b): + """The client sends us some data""" + pos = self.c2s.tell() + self.c2s.write(b) + self.c2s.seek(pos) + self.handler.process_pending() + return len(b) + + def readinto(self, buf): + """The client wants to read a response""" + self.handler.process_pending() + b = self.s2c.read(len(buf)) + n = len(b) + buf[:n] = b + return n + + +def make_mock_file(handler): + sio = _NNTPServerIO(handler) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(sio, sio) + return (sio, file) + + +class MockedNNTPTestsMixin: + # Override in derived classes + handler_class = None + + def setUp(self): + super().setUp() + self.make_server() + + def tearDown(self): + super().tearDown() + del self.server + + def make_server(self, *args, **kwargs): + self.handler = self.handler_class() + self.sio, file = make_mock_file(self.handler) + self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) + return self.server + + +class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): + def setUp(self): + super().setUp() + self.make_server(readermode=True) + + class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): """A handler that allows CAPABILITIES only after login""" @@ -1508,64 +1611,6 @@ def nntp_class(*pos, **kw): return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) -@unittest.skipUnless(threading, 'requires multithreading') -class LocalServerTests(unittest.TestCase): - def setUp(self): - sock = socket.socket() - port = support.bind_port(sock) - sock.listen() - self.background = threading.Thread( - target=self.run_server, args=(sock,)) - self.background.start() - self.addCleanup(self.background.join) - - self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() - self.addCleanup(self.nntp.__exit__, None, None, None) - - def run_server(self, sock): - # Could be generalized to handle more commands in separate methods - with sock: - [client, _] = sock.accept() - with contextlib.ExitStack() as cleanup: - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - client.sendall(b'200 Server ready\r\n') - while True: - cmd = reader.readline() - if cmd == b'CAPABILITIES\r\n': - client.sendall( - b'101 Capability list:\r\n' - b'VERSION 2\r\n' - b'STARTTLS\r\n' - b'.\r\n' - ) - elif cmd == b'STARTTLS\r\n': - reader.close() - client.sendall(b'382 Begin TLS negotiation now\r\n') - client = ssl.wrap_socket( - client, server_side=True, certfile=certfile) - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - elif cmd == b'QUIT\r\n': - client.sendall(b'205 Bye!\r\n') - break - else: - raise ValueError('Unexpected command {!r}'.format(cmd)) - - @unittest.skipUnless(ssl, 'requires SSL support') - def test_starttls(self): - file = self.nntp.file - sock = self.nntp.sock - self.nntp.starttls() - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.nntp.file) - self.assertNotEqual(sock, self.nntp.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.nntp.starttls) - if __name__ == "__main__": unittest.main()