diff -r 736397b225df Lib/nntplib.py --- a/Lib/nntplib.py Tue Sep 14 12:08:08 2010 +0200 +++ b/Lib/nntplib.py Tue Sep 14 16:06:45 2010 +0200 @@ -1,4 +1,7 @@ -"""An NNTP client class based on RFC 977: Network News Transfer Protocol. +"""An NNTP client class based on: +- RFC 977: Network News Transfer Protocol +- RFC 2980: Common NNTP Extensions +- RFC 3977: Network News Transfer Protocol (version 2) Example: @@ -27,15 +30,25 @@ are strings, not numbers, since they are # RFC 977 by Brian Kantor and Phil Lapsley. # xover, xgtitle, xpath, date methods by Kevan Heydon +# TODO: +# - take and return datetime objects everywhere instead of date and time strings +# - return structured data (GroupInfo etc.) everywhere +# - POST/IHAVE should take arbitrary iterables of lines (not only file objects) # Imports import re import socket +import collections +import datetime -__all__ = ["NNTP","NNTPReplyError","NNTPTemporaryError", - "NNTPPermanentError","NNTPProtocolError","NNTPDataError", - "error_reply","error_temp","error_perm","error_proto", - "error_data",] +from email.header import decode_header as _email_decode_header +from socket import _GLOBAL_DEFAULT_TIMEOUT + +__all__ = ["NNTP", + "NNTPReplyError", "NNTPTemporaryError", "NNTPPermanentError", + "NNTPProtocolError", "NNTPDataError", + "decode_header", + ] # Exceptions raised when an error or invalid response is received class NNTPError(Exception): @@ -67,39 +80,163 @@ class NNTPDataError(NNTPError): """Error in response data""" pass -# for backwards compatibility -error_reply = NNTPReplyError -error_temp = NNTPTemporaryError -error_perm = NNTPPermanentError -error_proto = NNTPProtocolError -error_data = NNTPDataError - - # Standard port used by NNTP servers NNTP_PORT = 119 # Response numbers that are followed by additional text (e.g. article) -LONGRESP = [b'100', b'215', b'220', b'221', b'222', b'224', b'230', b'231', b'282'] +_LONGRESP = { + '100', # HELP + '101', # CAPABILITIES + '211', # LISTGROUP (also not multi-line with GROUP) + '215', # LIST + '220', # ARTICLE + '221', # HEAD, XHDR + '222', # BODY + '224', # OVER, XOVER + '225', # HDR + '230', # NEWNEWS + '231', # NEWGROUPS + '282', # XGTITLE +} +# Default decoded value for LIST OVERVIEW.FMT if not supported +_DEFAULT_OVERVIEW_FMT = [ + "subject", "from", "date", "message-id", "references", ":bytes", ":lines"] + +# Alternative names allowed in LIST OVERVIEW.FMT response +_OVERVIEW_FMT_ALTERNATIVES = { + 'bytes': ':bytes', + 'lines': ':lines', +} # Line terminators (we always output CRLF, but accept any of CRLF, CR, LF) -CRLF = b'\r\n' +_CRLF = b'\r\n' +GroupInfo = collections.namedtuple('GroupInfo', + ['group', 'last', 'first', 'flag']) +ArticleInfo = collections.namedtuple('ArticleInfo', + ['resp', 'nr', 'id', 'lines']) -# The class itself -class NNTP: - def __init__(self, host, port=NNTP_PORT, user=None, password=None, - readermode=None, usenetrc=True): + +# Helper function(s) +def decode_header(header_str): + """Takes an unicode string representing a munged header value + and decodes it as a (possibly non-ASCII) readable value.""" + parts = [] + for v, enc in _email_decode_header(header_str): + if isinstance(v, bytes): + parts.append(v.decode(enc or 'ascii')) + else: + parts.append(v) + return ' '.join(parts) + +def _parse_overview_fmt(lines): + """Parse a list of string representing the response to LIST OVERVIEW.FMT + and return a list of header/metadata names. + Raises NNTPDataError if the response is not compliant + (cf. RFC 3977, section 8.4).""" + fmt = [] + for line in lines: + if line[0] == ':': + # Metadata name (e.g. ":bytes") + name, _, suffix = line[1:].partition(':') + name = ':' + name + else: + # Header name (e.g. "Subject:" or "Xref:full") + name, _, suffix = line.partition(':') + name = name.lower() + name = _OVERVIEW_FMT_ALTERNATIVES.get(name, name) + # Should we do something with the suffix? + fmt.append(name) + defaults = _DEFAULT_OVERVIEW_FMT + if len(fmt) < len(defaults): + raise NNTPDataError("LIST OVERVIEW.FMT response too short") + if fmt[:len(defaults)] != defaults: + raise NNTPDataError("LIST OVERVIEW.FMT redefines default fields") + return fmt + +def _parse_overview(lines, fmt, data_process_func=None): + """Parse the response to a OVER or XOVER command according to the + overview format `fmt`.""" + n_defaults = len(_DEFAULT_OVERVIEW_FMT) + overview = [] + for line in lines: + fields = {} + article_number, *tokens = line.split('\t') + for i, token in enumerate(tokens): + field_name = fmt[i] + if i >= len(fmt): + # XXX should we raise an error? Some servers might not + # support LIST OVERVIEW.FMT and still return additional + # headers. + continue + is_metadata = field_name.startswith(':') + if i >= n_defaults and not is_metadata: + # Non-default header names are included in full in the response + h = field_name + ":" + if token[:len(h)].lower() != h: + raise NNTPDataError("OVER/XOVER response doesn't include " + "names of additional headers") + token = token[len(h):].lstrip(" ") + fields[fmt[i]] = token + overview.append((article_number, fields)) + return overview + +def _parse_datetime(date_str, time_str=None): + """Parse a pair of (date, time) strings. If only the date is given, + it is assumed to be both date and time concatenated (e.g. response + to the DATE command). + """ + if time_str is None: + time_str = date_str[-6:] + date_str = date_str[:-6] + hours = int(time_str[:2]) + minutes = int(time_str[2:4]) + seconds = int(time_str[4:]) + year = int(date_str[:-4]) + month = int(date_str[-4:-2]) + day = int(date_str[-2:]) + # RFC 3977 doesn't say how to interpret 2-char years. Assume that + # there are no dates before 1970 on Usenet. + if year < 70: + year += 2000 + elif year < 100: + year += 1900 + return datetime.datetime(year, month, day, hours, minutes, seconds) + + +# The classes themselves +class _NNTPBase: + # UTF-8 is the character set for all NNTP commands and responses: they + # are automatically encoded (when sending) and decoded (and receiving) + # by this class. + # However, some multi-line data blocks can contain arbitrary bytes (for + # example, latin-1 or utf-16 data in the body of a message). Commands + # taking (POST, IHAVE) or returning (HEAD, BODY, ARTICLE) raw message + # data will therefore only accept and produce bytes objects. + # Furthermore, since there could be non-compliant servers out there, + # we use 'surrogateescape' as the error handler for fault tolerance + # and easy round-tripping. This could be useful for some applications + # (e.g. NNTP gateways). + + encoding = 'utf-8' + errors = 'surrogateescape' + + def __init__(self, file, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): """Initialize an instance. Arguments: - - host: hostname to connect to - - port: port to connect to (default the standard NNTP port) + - file: file-like object (open for read/write in binary mode) - user: username to authenticate with - password: password to use with username - readermode: if true, send 'mode reader' command after connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections readermode is sometimes necessary if you are connecting to an NNTP server on the local machine and intend to call @@ -107,10 +244,7 @@ class NNTP: unexpected NNTPPermanentErrors, you might need to set readermode. """ - self.host = host - self.port = port - self.sock = socket.create_connection((host, port)) - self.file = self.sock.makefile('rb') + self.file = file self.debugging = 0 self.welcome = self.getresp() @@ -127,7 +261,7 @@ class NNTP: # error 500, probably 'not implemented' pass except NNTPTemporaryError as e: - if user and e.response.startswith(b'480'): + if user and e.response.startswith('480'): # Need authorization before 'mode reader' readermode_afterauth = 1 else: @@ -144,16 +278,16 @@ class NNTP: password = auth[2] except IOError: pass - # Perform NNRP authentication if needed. + # Perform NNTP authentication if needed. if user: resp = self.shortcmd('authinfo user '+user) - if resp.startswith(b'381'): + if resp.startswith('381'): if not password: raise NNTPReplyError(resp) else: resp = self.shortcmd( 'authinfo pass '+password) - if not resp.startswith(b'281'): + if not resp.startswith('281'): raise NNTPPermanentError(resp) if readermode_afterauth: try: @@ -162,11 +296,17 @@ class NNTP: # error 500, probably 'not implemented' pass - - # Get the welcome message from the server - # (this is read and squirreled away by __init__()). - # If the response code is 200, posting is allowed; - # if it 201, posting is not allowed + # Inquire about capabilities (RFC 3977) + self.nntp_version = 1 + try: + resp, caps = self.capabilities() + except NNTPPermanentError: + # Server doesn't support capabilities + self._caps = {} + else: + self._caps = caps + if 'VERSION' in caps: + self.nntp_version = int(caps['VERSION'][0]) def getwelcome(self): """Get the welcome message from the server @@ -177,6 +317,12 @@ class NNTP: if self.debugging: print('*welcome*', repr(self.welcome)) return self.welcome + def getcapabilities(self): + """Get the server capabilities, as read by __init__(). + If the CAPABILITIES command is not supported, an empty dict is + returned.""" + return self._caps + def set_debuglevel(self, level): """Set the debugging level. Argument 'level' means: 0: no debugging output (default) @@ -187,56 +333,67 @@ class NNTP: debug = set_debuglevel def putline(self, line): - """Internal: send one line to the server, appending CRLF.""" - line = line + CRLF + """Internal: send one line to the server, appending CRLF. + The `line` must be a bytes-like object.""" + line = line + _CRLF if self.debugging > 1: print('*put*', repr(line)) - self.sock.sendall(line) + self.file.write(line) + self.file.flush() def putcmd(self, line): - """Internal: send one command to the server (through putline()).""" + """Internal: send one command to the server (through putline()). + The `line` must be an unicode string.""" if self.debugging: print('*cmd*', repr(line)) - line = bytes(line, "ASCII") + line = line.encode(self.encoding, self.errors) self.putline(line) def getline(self): - """Internal: return one line from the server, stripping CRLF. - Raise EOFError if the connection is closed.""" + """Internal: return one line from the server, stripping _CRLF. + Raise EOFError if the connection is closed. + Returns a bytes object.""" line = self.file.readline() if self.debugging > 1: print('*get*', repr(line)) if not line: raise EOFError - if line[-2:] == CRLF: + if line[-2:] == _CRLF: line = line[:-2] - elif line[-1:] in CRLF: + elif line[-1:] in _CRLF: line = line[:-1] return line def getresp(self): """Internal: get a response from the server. - Raise various errors if the response indicates an error.""" + Raise various errors if the response indicates an error. + Returns an unicode string.""" resp = self.getline() if self.debugging: print('*resp*', repr(resp)) + resp = resp.decode(self.encoding, self.errors) c = resp[:1] - if c == b'4': + if c == '4': raise NNTPTemporaryError(resp) - if c == b'5': + if c == '5': raise NNTPPermanentError(resp) - if c not in b'123': + if c not in '123': raise NNTPProtocolError(resp) return resp def getlongresp(self, file=None): """Internal: get a response plus following text from the server. - Raise various errors if the response indicates an error.""" + Raise various errors if the response indicates an error. + + Returns a (response, lines) tuple where `response` is an unicode + string and `lines` is a list of bytes objects. + If `file` is a file-like object, it must be open in binary mode. + """ openedFile = None try: # If a string was passed then open a file with that name - if isinstance(file, str): - openedFile = file = open(file, "w") + if isinstance(file, (str, bytes)): + openedFile = file = open(file, "wb") resp = self.getresp() - if resp[:3] not in LONGRESP: + if resp[:3] not in _LONGRESP: raise NNTPReplyError(resp) list = [] while 1: @@ -257,50 +414,96 @@ class NNTP: return resp, list def shortcmd(self, line): - """Internal: send a command and get the response.""" + """Internal: send a command and get the response. + Same return value as getresp().""" self.putcmd(line) return self.getresp() def longcmd(self, line, file=None): - """Internal: send a command and get the response plus following text.""" + """Internal: send a command and get the response plus following text. + Same return value as getlongresp().""" self.putcmd(line) return self.getlongresp(file) - def newgroups(self, date, time, file=None): + def _longcmdstring(self, line, file=None): + """Internal: send a command and get the response plus following text. + Same as longcmd() and getlongresp(), except that the returned `lines` + are unicode strings rather than bytes objects. + """ + self.putcmd(line) + resp, list = self.getlongresp(file) + return resp, (line.decode(self.encoding, self.errors) + for line in list) + + def _getoverviewfmt(self): + """Internal: get the overview format. Queries the server if not + already done, else returns the cached value.""" + try: + return self._cachedoverviewfmt + except AttributeError: + pass + try: + resp, lines = self._longcmdstring("LIST OVERVIEW.FMT") + except NNTPPermanentError: + # Not supported by server? + fmt = _DEFAULT_OVERVIEW_FMT[:] + else: + fmt = _parse_overview_fmt(lines) + self._cachedoverviewfmt = fmt + return fmt + + def _grouplist(self, lines): + # Parse lines into "group last first flag" + return [GroupInfo(*line.split()) for line in lines] + + def capabilities(self): + """Process a CAPABILITIES command. Not supported by all servers. + Return: + - resp: server response if successful + - caps: a dictionary mapping capability names to lists of tokens + (for example {'VERSION': ['2'], 'OVER': [], LIST: ['ACTIVE', 'HEADERS'] }) + """ + caps = {} + resp, lines = self._longcmdstring("CAPABILITIES") + for line in lines: + name, *tokens = line.split() + caps[name] = tokens + return resp, caps + + def newgroups(self, date, time, *, file=None): """Process a NEWGROUPS command. Arguments: - date: string 'yymmdd' indicating the date - time: string 'hhmmss' indicating the time Return: - resp: server response if successful - - list: list of newsgroup names""" + - list: list of newsgroup names + """ + resp, list = self._longcmdstring('NEWGROUPS ' + date + ' ' + time, file) + return resp, self._grouplist(list) - return self.longcmd('NEWGROUPS ' + date + ' ' + time, file) - - def newnews(self, group, date, time, file=None): + def newnews(self, group, date, time, *, file=None): """Process a NEWNEWS command. Arguments: - group: group name or '*' - date: string 'yymmdd' indicating the date - time: string 'hhmmss' indicating the time Return: - resp: server response if successful - - list: list of message ids""" + - list: list of message ids + """ + cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time + return self._longcmdstring(cmd, file) - cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time - return self.longcmd(cmd, file) - - def list(self, file=None): - """Process a LIST command. Return: + def list(self, *, file=None): + """Process a LIST command. Argument: + - file: Filename string or file object to store the result in + Returns: - resp: server response if successful - - list: list of (group, last, first, flag) (strings)""" - - resp, list = self.longcmd('LIST', file) - for i in range(len(list)): - # Parse lines into "group last first flag" - list[i] = tuple(list[i].split()) - return resp, list + - list: list of (group, last, first, flag) (strings) + """ + resp, list = self._longcmdstring('LIST', file) + return resp, self._grouplist(list) def description(self, group): - """Get a description for a single group. If more than one group matches ('group' is a pattern), return the first. If no group matches, return an empty string. @@ -311,29 +514,25 @@ class NNTP: NOTE: This neither checks for a wildcard in 'group' nor does it check whether the group actually exists.""" - resp, lines = self.descriptions(group) - if len(lines) == 0: - return b'' - else: - return lines[0][1] + return lines.get(group, '') def descriptions(self, group_pattern): """Get descriptions for a range of groups.""" - line_pat = re.compile(b'^(?P[^ \t]+)[ \t]+(.*)$') + line_pat = re.compile('^(?P[^ \t]+)[ \t]+(.*)$') # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first - resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern) - if not resp.startswith(b'215'): + resp, lines = self._longcmdstring('LIST NEWSGROUPS ' + group_pattern) + if not resp.startswith('215'): # Now the deprecated XGTITLE. This either raises an error # or succeeds with the same output structure as LIST # NEWSGROUPS. - resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern) - lines = [] - for raw_line in raw_lines: + resp, lines = self._longcmdstring('XGTITLE ' + group_pattern) + groups = {} + for raw_line in lines: match = line_pat.search(raw_line.strip()) if match: - lines.append(match.group(1, 2)) - return resp, lines + groups[match.group(1)] = match.group(2) + return resp, groups def group(self, name): """Process a GROUP command. Argument: @@ -343,10 +542,10 @@ class NNTP: - count: number of articles (string) - first: first article number (string) - last: last article number (string) - - name: the group name""" - + - name: the group name + """ resp = self.shortcmd('GROUP ' + name) - if not resp.startswith(b'211'): + if not resp.startswith('211'): raise NNTPReplyError(resp) words = resp.split() count = first = last = 0 @@ -361,20 +560,23 @@ class NNTP: name = words[4].lower() return resp, count, first, last, name - def help(self, file=None): - """Process a HELP command. Returns: + def help(self, *, file=None): + """Process a HELP command. Argument: + - file: Filename string or file object to store the result in + Returns: - resp: server response if successful - - list: list of strings""" - - return self.longcmd('HELP',file) + - list: list of strings returned by the server in response to the + HELP command + """ + return self._longcmdstring('HELP',file) def statparse(self, resp): """Internal: parse the response of a STAT, NEXT or LAST command.""" - if not resp.startswith(b'22'): + if not resp.startswith('22'): raise NNTPReplyError(resp) words = resp.split() nr = 0 - id = b'' + id = '' n = len(words) if n > 1: nr = words[1] @@ -405,11 +607,11 @@ class NNTP: """Process a LAST command. No arguments. Return as for STAT.""" return self.statcmd('LAST') - def artcmd(self, line, file=None): + def artcmd(self, line, *, file=None): """Internal: process a HEAD, BODY or ARTICLE command.""" resp, list = self.longcmd(line, file) resp, nr, id = self.statparse(resp) - return resp, nr, id, list + return ArticleInfo(resp, nr, id, list) def head(self, id): """Process a HEAD command. Argument: @@ -422,7 +624,7 @@ class NNTP: return self.artcmd('HEAD {0}'.format(id)) - def body(self, id, file=None): + def body(self, id, *, file=None): """Process a BODY command. Argument: - id: article number or message id - file: Filename string or file object to store the article in @@ -431,8 +633,8 @@ class NNTP: - nr: article number - id: message id - list: the lines of the article's body or an empty list - if file was used""" - + if file was used + """ return self.artcmd('BODY {0}'.format(id), file) def article(self, id): @@ -442,68 +644,57 @@ class NNTP: - resp: server response if successful - nr: article number - id: message id - - list: the lines of the article""" - + - list: the lines of the article + """ return self.artcmd('ARTICLE {0}'.format(id)) def slave(self): """Process a SLAVE command. Returns: - - resp: server response if successful""" - + - resp: server response if successful + """ return self.shortcmd('SLAVE') - def xhdr(self, hdr, str, file=None): + def xhdr(self, hdr, str, *, file=None): """Process an XHDR command (optional server extension). Arguments: - hdr: the header type (e.g. 'subject') - str: an article nr, a message id, or a range nr1-nr2 + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (nr, value) strings""" + - list: list of (nr, value) strings + """ + pat = re.compile('^([0-9]+) ?(.*)\n?') + resp, lines = self._longcmdstring('XHDR {0} {1}'.format(hdr, str), file) + def remove_number(line): + m = pat.match(line) + return m.group(1, 2) if m else line + return resp, [remove_number(line) for line in lines] - pat = re.compile(b'^([0-9]+) ?(.*)\n?') - resp, lines = self.longcmd('XHDR {0} {1}'.format(hdr, str), file) - for i in range(len(lines)): - line = lines[i] - m = pat.match(line) - if m: - lines[i] = m.group(1, 2) - return resp, lines - - def xover(self, start, end, file=None): + def xover(self, start, end, *, file=None): """Process an XOVER command (optional server extension) Arguments: - start: start of range - end: end of range + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (art-nr, subject, poster, date, - id, references, size, lines)""" + - list: list of dicts containing the response fields + """ + resp, lines = self._longcmdstring('XOVER {0}-{1}'.format(start, end), + file) + fmt = self._getoverviewfmt() + return resp, _parse_overview(lines, fmt) - resp, lines = self.longcmd('XOVER {0}-{1}'.format(start, end), file) - xover_lines = [] - for line in lines: - elem = line.split(b'\t') - try: - xover_lines.append((elem[0], - elem[1], - elem[2], - elem[3], - elem[4], - elem[5].split(), - elem[6], - elem[7])) - except IndexError: - raise NNTPDataError(line) - return resp,xover_lines - - def xgtitle(self, group, file=None): + def xgtitle(self, group, *, file=None): """Process an XGTITLE command (optional server extension) Arguments: - group: group name wildcard (i.e. news.*) Returns: - resp: server response if successful - list: list of (name,title) strings""" - - line_pat = re.compile(b'^([^ \t]+)[ \t]+(.*)$') - resp, raw_lines = self.longcmd('XGTITLE ' + group, file) + from warnings import warn + warn("The XGTITLE extension is not actively used, use the description()" + "function instead", PendingDeprecationWarning, 2) + line_pat = re.compile('^([^ \t]+)[ \t]+(.*)$') + resp, raw_lines = self._longcmdstring('XGTITLE ' + group, file) lines = [] for raw_line in raw_lines: match = line_pat.search(raw_line.strip()) @@ -511,15 +702,19 @@ class NNTP: lines.append(match.group(1, 2)) return resp, lines - def xpath(self,id): + def xpath(self, id): """Process an XPATH command (optional server extension) Arguments: - id: Message id of article Returns: resp: server response if successful - path: directory path to article""" + path: directory path to article + """ + from warnings import warn + warn("The XPATH extension is not actively used", + PendingDeprecationWarning, 2) resp = self.shortcmd('XPATH {0}'.format(id)) - if not resp.startswith(b'223'): + if not resp.startswith('223'): raise NNTPReplyError(resp) try: [resp_num, path] = resp.split() @@ -528,30 +723,28 @@ class NNTP: else: return resp, path - def date (self): - """Process the DATE command. Arguments: - None + def date(self): + """Process the DATE command. Returns: resp: server response if successful date: Date suitable for newnews/newgroups commands etc. - time: Time suitable for newnews/newgroups commands etc.""" - + time: Time suitable for newnews/newgroups commands etc. + """ resp = self.shortcmd("DATE") - if not resp.startswith(b'111'): + if not resp.startswith('111'): raise NNTPReplyError(resp) elem = resp.split() if len(elem) != 2: raise NNTPDataError(resp) - date = elem[1][2:8] - time = elem[1][-6:] - if len(date) != 6 or len(time) != 6: + date = elem[1] + if len(date) != 14: raise NNTPDataError(resp) - return resp, date, time + return resp, _parse_datetime(date, None) def _post(self, command, f): resp = self.shortcmd(command) - # Raises error_??? if posting is not allowed - if not resp.startswith(b'3'): + # Raises a specific exception if posting is not allowed + if not resp.startswith('3'): raise NNTPReplyError(resp) while 1: line = f.readline() @@ -581,17 +774,55 @@ class NNTP: Note that if the server refuses the article an exception is raised.""" return self._post('IHAVE {0}'.format(id), f) + def _close(self): + self.file.close() + del self.file + def quit(self): """Process a QUIT command and close the socket. Returns: - resp: server response if successful""" - - resp = self.shortcmd('QUIT') - self.file.close() - self.sock.close() - del self.file, self.sock + try: + resp = self.shortcmd('QUIT') + finally: + self._close() return resp +class NNTP(_NNTPBase): + + def __init__(self, host, port=NNTP_PORT, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): + """Initialize an instance. Arguments: + - host: hostname to connect to + - port: port to connect to (default the standard NNTP port) + - user: username to authenticate with + - password: password to use with username + - readermode: if true, send 'mode reader' command after + connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections + + readermode is sometimes necessary if you are connecting to an + NNTP server on the local machine and intend to call + reader-specific comamnds, such as `group'. If you get + unexpected NNTPPermanentErrors, you might need to set + readermode. + """ + self.host = host + self.port = port + self.sock = socket.create_connection((host, port), timeout) + file = self.sock.makefile("rwb") + _NNTPBase.__init__(self, file, user, password, + readermode, usenetrc, timeout) + + def _close(self): + try: + _NNTPBase._close(self) + finally: + self.sock.close() + # Test retrieval when run as a script. # Assumption: if there's a local news server, it's called 'news'. # Assumption: if user queries a remote news server, it's named diff -r 736397b225df Lib/test/test_nntplib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/test_nntplib.py Tue Sep 14 16:06:45 2010 +0200 @@ -0,0 +1,442 @@ +import io +import datetime +import textwrap +import unittest +import contextlib +from test import support +from nntplib import NNTP, GroupInfo +import nntplib + +TIMEOUT = 10 + +# TODO: +# - test the `file` arg to various commands +# - test HELP +# - test error conditions +# - test XOVER/OVER incl. with non-ascii headers +# - test POST, IHAVE + + +class NetworkedNNTPTestsMixin: + + def test_welcome(self): + welcome = self.server.getwelcome() + self.assertEqual(str, type(welcome)) + + def test_help(self): + resp, list = self.server.help() + for line in list: + self.assertEqual(str, type(line)) + + def test_list(self): + resp, list = self.server.list() + if len(list) > 0: + self.assertEqual(GroupInfo, type(list[0])) + self.assertEqual(str, type(list[0].group)) + + def test_newgroups(self): + # gmane gets a constant influx of new groups. In order not to stress + # the server too much, we choose a recent date in the past. + dt = datetime.date.today() - datetime.timedelta(days=7) + dt = "%02d%02d%02d" % (dt.year % 100, dt.month, dt.day) + resp, list = self.server.newgroups(dt, "000000") + if len(list) > 0: + self.assertEqual(GroupInfo, type(list[0])) + self.assertEqual(str, type(list[0].group)) + + def test_description(self): + descr = self.server.description(self.GROUP_NAME) + self.assertEqual(str, type(descr)) + # A description was found + self.assertTrue(descr) + + def test_group(self): + result = self.server.group(self.GROUP_NAME) + self.assertEqual(5, len(result)) + self.assertEqual(str, type(result[4])) + + def test_date(self): + resp, date = self.server.date() + self.assertIsInstance(date, datetime.datetime) + # Sanity check + self.assertGreaterEqual(date.year, 1995) + self.assertLessEqual(date.year, 2030) + + def test_head(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + result = self.server.head(last) + for line in result.lines: + self.assertEqual(bytes, type(line)) + + def test_xover(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xover(last, last) + art_num, d = lines[0] + self.assertEqual(art_num, last) + self.assertIsInstance(d, dict) + # NNTP has 7 mandatory fields for OVER/XOVER + self.assertGreaterEqual(len(d), 7) + for v in d.values(): + self.assertIsInstance(v, str) + + def test_xhdr(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xhdr('subject', last) + for line in lines: + self.assertEqual(str, type(line[1])) + + def test_quit(self): + self.server.quit() + self.server = None + + +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): + NNTP_HOST = 'news.gmane.org' + GROUP_NAME = 'gmane.comp.python.devel' + + def setUp(self): + support.requires("network") + with support.transient_internet(self.NNTP_HOST): + self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT) + + def tearDown(self): + if self.server is not None: + self.server.quit() + + # Disabled with gmane as it produces too much data + test_list = None + + def test_capabilities(self): + # As of this writing, gmane implements NNTP version 2 and has a + # couple of well-known capabilities. Just sanity check that we + # got them. + def _check_caps(caps): + caps_list = caps['LIST'] + self.assertIsInstance(caps_list, (list, tuple)) + self.assertIn('OVERVIEW.FMT', caps_list) + self.assertGreaterEqual(self.server.nntp_version, 2) + _check_caps(self.server.getcapabilities()) + # This re-emits the command + resp, caps = self.server.capabilities() + _check_caps(caps) + + +# +# Non-networked tests using a local server (or something mocking it). +# + +class _NNTPServerIO(io.RawIOBase): + """A raw IO object allowing NNTP commands to be received and processed + by a handler. The handler can push responses which can then be read + from the IO object.""" + + def __init__(self, handler): + io.RawIOBase.__init__(self) + self.handler = handler + # The channel from the client + self.c2s = io.BytesIO() + # The channel to the client + self.s2c = io.BytesIO() + # Our welcome + self.handler.handle_welcome(self) + + def readable(self): + return True + + def writable(self): + return True + + def _decode(self, data): + return str(data, "utf-8", "surrogateescape") + + def _process_pending(self): + while True: + line = self._decode(self.c2s.readline()) + if not line: + return + if not line.endswith("\r\n"): + raise ValueError("line doesn't end with \\r\\n: %r" % line) + line = line[:-2] + cmd, *tokens = line.split() + meth = getattr(self.handler, "handle_" + cmd.upper(), None) + if meth is None: + self.handler.handle_unknown(self) + else: + meth(self, *tokens) + + def push_data(self, data): + """Push (buffer) some data to send to the client.""" + pos = self.s2c.tell() + self.s2c.write(data) + self.s2c.seek(pos) + + def push_lit(self, lit): + """Push a string literal""" + lit = textwrap.dedent(lit) + lit = "\r\n".join(lit.splitlines()) + "\r\n" + lit = lit.encode('utf-8') + self.push_data(lit) + + def write(self, b): + """The client sends us some data""" + pos = self.c2s.tell() + self.c2s.write(b) + self.c2s.seek(pos) + self._process_pending() + return len(b) + + def readinto(self, buf): + """The client wants to read a response""" + self._process_pending() + b = self.s2c.read(len(buf)) + n = len(b) + buf[:n] = b + return n + + +class MockedNNTPTestsMixin: + welcome = "200 NNTP mock server" + + def setUp(self): + self.make_server() + + def tearDown(self): + del self.server + + def make_server(self, *args, **kwargs): + self.sio = _NNTPServerIO(handler=self) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(self.sio, self.sio) + self.server = nntplib._NNTPBase(file, *args, **kwargs) + return self.server + + def handle_unknown(self, serv): + serv.push_lit("500 What?") + + def handle_welcome(self, serv): + serv.push_lit(self.welcome) + + def handle_QUIT(self, serv): + serv.push_lit("205 Bye!") + + def handle_DATE(self, serv): + serv.push_lit("111 20100914001155") + + def handle_LIST(self, serv, action=None, param=None): + if action is None: + serv.push_lit("""\ + 215 Newsgroups in form "group high low flags". + comp.lang.python 0000052340 0000002828 y + comp.lang.python.announce 0000001153 0000000993 m + free.it.comp.lang.python 0000000002 0000000002 y + fr.comp.lang.python 0000001254 0000000760 y + free.it.comp.lang.python.learner 0000000000 0000000001 y + tw.bbs.comp.lang.python 0000000304 0000000304 y + .""") + elif action == "NEWSGROUPS": + assert param is not None + serv.push_lit('215 Descriptions in form "group description".') + if param == "comp.lang.python": + serv.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + .""") + elif param == "comp.lang.python*": + serv.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) + .""") + else: + serv.push_lit("""\ + 215 Descriptions in form "group description". + .""") + else: + serv.push_list('501 Unknown LIST keyword') + + +class NNTPv1v2TestsMixin: + + def test_welcome(self): + self.assertEqual(self.server.welcome, self.welcome) + + def test_date(self): + resp, date = self.server.date() + self.assertEqual(resp, "111 20100914001155") + self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) + + def test_quit(self): + self.assertFalse(self.sio.closed) + resp = self.server.quit() + self.assertEqual(resp, "205 Bye!") + self.assertTrue(self.sio.closed) + + def test_list(self): + resp, groups = self.server.list() + self.assertEqual(len(groups), 6) + g = groups[1] + self.assertEqual(g, + GroupInfo("comp.lang.python.announce", "0000001153", + "0000000993", "m")) + + def test_description(self): + desc = self.server.description("comp.lang.python") + self.assertEqual(desc, "The Python computer language.") + desc = self.server.description("comp.lang.pythonx") + self.assertEqual(desc, "") + + def test_descriptions(self): + resp, groups = self.server.descriptions("comp.lang.python") + self.assertEqual(resp, '215 Descriptions in form "group description".') + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + }) + resp, groups = self.server.descriptions("comp.lang.python*") + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", + }) + resp, groups = self.server.descriptions("comp.lang.pythonx") + self.assertEqual(groups, {}) + + +class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v1 server (no capabilities).""" + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, {}) + self.assertEqual(self.server.nntp_version, 1) + + +class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v2 server (with capabilities).""" + + def handle_CAPABILITIES(self, serv): + serv.push_lit("""\ + 101 Capability list: + VERSION 2 + IMPLEMENTATION INN 2.5.1 + AUTHINFO USER + HDR + LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT + OVER + POST + READER + .""") + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, { + 'VERSION': ['2'], + 'IMPLEMENTATION': ['INN', '2.5.1'], + 'AUTHINFO': ['USER'], + 'HDR': [], + 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', + 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], + 'OVER': [], + 'POST': [], + 'READER': [], + }) + self.assertEqual(self.server.nntp_version, 2) + + +class MiscTests(unittest.TestCase): + + def test_decode_header(self): + def gives(a, b): + self.assertEqual(nntplib.decode_header(a), b) + gives("" , "") + gives("a plain header", "a plain header") + gives(" with extra spaces ", " with extra spaces ") + gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") + gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" + " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", + "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") + gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", + "Re: problème de matrice") + + def test_parse_overview_fmt(self): + # The minimal (default) response + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # The minimal response using alternative names + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # Variations in casing + lines = ["subject:", "FROM:", "DaTe:", "message-ID:", + "References:", "BYTES:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # First example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines", "Xref:full", + "Distribution:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # Second example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:FULL", + "Distribution:FULL"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # A classic response from INN + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref"]) + + def test_parse_overview(self): + fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] + # First example from RFC 3977 + lines = [ + '3000234\tI am just a test article\t"Demo User" ' + '\t6 Oct 1998 04:38:40 -0500\t' + '<45223423@example.com>\t<45454@example.net>\t1234\t' + '17\tXref: news.example.com misc.test:3000363', + ] + overview = nntplib._parse_overview(lines, fmt) + (art_num, fields), = overview + self.assertEqual(art_num, '3000234') + self.assertEqual(fields, { + 'subject': 'I am just a test article', + 'from': '"Demo User" ', + 'date': '6 Oct 1998 04:38:40 -0500', + 'message-id': '<45223423@example.com>', + 'references': '<45454@example.net>', + ':bytes': '1234', + ':lines': '17', + 'xref': 'news.example.com misc.test:3000363', + }) + + def test_parse_datetime(self): + def gives(a, b, *c): + self.assertEqual(nntplib._parse_datetime(a, b), + datetime.datetime(*c)) + # Output of DATE command + gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) + # Variations + gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("090623", "135624", 2009, 6, 23, 13, 56, 24) + + +def test_main(): + support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests, + NetworkedNNTPTests + ) + + +if __name__ == "__main__": + test_main()