diff -r 936621d33c38 Lib/nntplib.py --- a/Lib/nntplib.py Wed Feb 20 18:19:55 2013 -0500 +++ b/Lib/nntplib.py Mon Sep 30 23:42:09 2013 +0200 @@ -37,6 +37,13 @@ "error_reply","error_temp","error_perm","error_proto", "error_data",] +# maximal line length when calling readline(). This is to prevent +# reading arbitrary lenght lines. RFC 3977 limits NNTP line length to +# 512 characters, including CRLF. We have selected 2048 just to be on +# the safe side. +_MAXLINE = 2048 + + # Exceptions raised when an error or invalid response is received class NNTPError(Exception): """Base class for all nntplib exceptions""" @@ -200,7 +207,9 @@ def getline(self): """Internal: return one line from the server, stripping CRLF. Raise EOFError if the connection is closed.""" - line = self.file.readline() + line = self.file.readline(_MAXLINE + 1) + if len(line) > _MAXLINE: + raise NNTPProtocolError('line too long') if self.debugging > 1: print '*get*', repr(line) if not line: raise EOFError diff -r 936621d33c38 Lib/test/test_nntplib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/test_nntplib.py Mon Sep 30 23:42:09 2013 +0200 @@ -0,0 +1,65 @@ +import socket +import threading +import nntplib +import time + +from unittest import TestCase +from test import test_support + +HOST = test_support.HOST + + +def server(evt, serv, evil=False): + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + if evil: + conn.send("1 I'm too long response" * 3000 + "\n") + else: + conn.send("1 I'm OK response\n") + conn.close() + finally: + serv.close() + evt.set() + + +class BaseServerTest(TestCase): + def setUp(self): + self.evt = threading.Event() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(3) + self.port = test_support.bind_port(self.sock) + threading.Thread( + target=server, + args=(self.evt, self.sock, self.evil)).start() + time.sleep(.1) + + def tearDown(self): + self.evt.wait() + + +class ServerTests(BaseServerTest): + evil = False + + def test_basic_connect(self): + nntp = nntplib.NNTP('localhost', self.port) + nntp.sock.close() + + +class EvilServerTests(BaseServerTest): + evil = True + + def test_too_long_line(self): + self.assertRaises(nntplib.NNTPProtocolError, + nntplib.NNTP, 'localhost', self.port) + + +def test_main(verbose=None): + test_support.run_unittest(EvilServerTests) + test_support.run_unittest(ServerTests) + +if __name__ == '__main__': + test_main()