diff -r 4ea5a13762e5 Lib/nntplib.py --- a/Lib/nntplib.py Wed Sep 15 17:13:17 2010 +0200 +++ b/Lib/nntplib.py Wed Sep 15 23:14:39 2010 +0200 @@ -1,4 +1,7 @@ -"""An NNTP client class based on RFC 977: Network News Transfer Protocol. +"""An NNTP client class based on: +- RFC 977: Network News Transfer Protocol +- RFC 2980: Common NNTP Extensions +- RFC 3977: Network News Transfer Protocol (version 2) Example: @@ -27,15 +30,48 @@ are strings, not numbers, since they are # RFC 977 by Brian Kantor and Phil Lapsley. # xover, xgtitle, xpath, date methods by Kevan Heydon +# Incompatible changes from the 2.x nntplib: +# - all commands are encoded as UTF-8 data (using the "surrogateescape" +# error handler), except for raw message data (POST, IHAVE) +# - all responses are decoded as UTF-8 data (using the "surrogateescape" +# error handler), except for raw message data (ARTICLE, HEAD, BODY) +# - the `file` argument to various methods is keyword-only +# +# - NNTP.date() returns a datetime object +# - NNTP.newgroups() and NNTP.newnews() take a datetime (or date) object, +# rather than a pair of (date, time) strings. +# - NNTP.newgroups() and NNTP.list() return a list of GroupInfo named tuples +# - NNTP.descriptions() returns a dict mapping group names to descriptions +# - NNTP.xover() returns a list of dicts mapping field names (header or metadata) +# to field values; each dict representing a message overview. +# - the "internal" methods have been marked private (they now start with +# an underscore) + +# Other changes from the 2.x nntplib: +# - automatic querying of capabilities at connect +# - New method NNTP.getcapabilities() +# - New helper function decode_header() +# - An extensive test suite :-) + +# TODO: +# - return structured data (GroupInfo etc.) everywhere +# - POST/IHAVE should take arbitrary iterables of lines (not only file objects) +# - remove PendingDeprecationWarnings inherited from the original patch? # Imports import re import socket +import collections +import datetime -__all__ = ["NNTP","NNTPReplyError","NNTPTemporaryError", - "NNTPPermanentError","NNTPProtocolError","NNTPDataError", - "error_reply","error_temp","error_perm","error_proto", - "error_data",] +from email.header import decode_header as _email_decode_header +from socket import _GLOBAL_DEFAULT_TIMEOUT + +__all__ = ["NNTP", + "NNTPReplyError", "NNTPTemporaryError", "NNTPPermanentError", + "NNTPProtocolError", "NNTPDataError", + "decode_header", + ] # Exceptions raised when an error or invalid response is received class NNTPError(Exception): @@ -67,39 +103,188 @@ class NNTPDataError(NNTPError): """Error in response data""" pass -# for backwards compatibility -error_reply = NNTPReplyError -error_temp = NNTPTemporaryError -error_perm = NNTPPermanentError -error_proto = NNTPProtocolError -error_data = NNTPDataError - - # Standard port used by NNTP servers NNTP_PORT = 119 # Response numbers that are followed by additional text (e.g. article) -LONGRESP = [b'100', b'215', b'220', b'221', b'222', b'224', b'230', b'231', b'282'] +_LONGRESP = { + '100', # HELP + '101', # CAPABILITIES + '211', # LISTGROUP (also not multi-line with GROUP) + '215', # LIST + '220', # ARTICLE + '221', # HEAD, XHDR + '222', # BODY + '224', # OVER, XOVER + '225', # HDR + '230', # NEWNEWS + '231', # NEWGROUPS + '282', # XGTITLE +} +# Default decoded value for LIST OVERVIEW.FMT if not supported +_DEFAULT_OVERVIEW_FMT = [ + "subject", "from", "date", "message-id", "references", ":bytes", ":lines"] + +# Alternative names allowed in LIST OVERVIEW.FMT response +_OVERVIEW_FMT_ALTERNATIVES = { + 'bytes': ':bytes', + 'lines': ':lines', +} # Line terminators (we always output CRLF, but accept any of CRLF, CR, LF) -CRLF = b'\r\n' +_CRLF = b'\r\n' +GroupInfo = collections.namedtuple('GroupInfo', + ['group', 'last', 'first', 'flag']) +ArticleInfo = collections.namedtuple('ArticleInfo', + ['resp', 'nr', 'id', 'lines']) -# The class itself -class NNTP: - def __init__(self, host, port=NNTP_PORT, user=None, password=None, - readermode=None, usenetrc=True): + +# Helper function(s) +def decode_header(header_str): + """Takes an unicode string representing a munged header value + and decodes it as a (possibly non-ASCII) readable value.""" + parts = [] + for v, enc in _email_decode_header(header_str): + if isinstance(v, bytes): + parts.append(v.decode(enc or 'ascii')) + else: + parts.append(v) + return ' '.join(parts) + +def _parse_overview_fmt(lines): + """Parse a list of string representing the response to LIST OVERVIEW.FMT + and return a list of header/metadata names. + Raises NNTPDataError if the response is not compliant + (cf. RFC 3977, section 8.4).""" + fmt = [] + for line in lines: + if line[0] == ':': + # Metadata name (e.g. ":bytes") + name, _, suffix = line[1:].partition(':') + name = ':' + name + else: + # Header name (e.g. "Subject:" or "Xref:full") + name, _, suffix = line.partition(':') + name = name.lower() + name = _OVERVIEW_FMT_ALTERNATIVES.get(name, name) + # Should we do something with the suffix? + fmt.append(name) + defaults = _DEFAULT_OVERVIEW_FMT + if len(fmt) < len(defaults): + raise NNTPDataError("LIST OVERVIEW.FMT response too short") + if fmt[:len(defaults)] != defaults: + raise NNTPDataError("LIST OVERVIEW.FMT redefines default fields") + return fmt + +def _parse_overview(lines, fmt, data_process_func=None): + """Parse the response to a OVER or XOVER command according to the + overview format `fmt`.""" + n_defaults = len(_DEFAULT_OVERVIEW_FMT) + overview = [] + for line in lines: + fields = {} + article_number, *tokens = line.split('\t') + for i, token in enumerate(tokens): + field_name = fmt[i] + if i >= len(fmt): + # XXX should we raise an error? Some servers might not + # support LIST OVERVIEW.FMT and still return additional + # headers. + continue + is_metadata = field_name.startswith(':') + if i >= n_defaults and not is_metadata: + # Non-default header names are included in full in the response + h = field_name + ":" + if token[:len(h)].lower() != h: + raise NNTPDataError("OVER/XOVER response doesn't include " + "names of additional headers") + token = token[len(h):].lstrip(" ") + fields[fmt[i]] = token + overview.append((article_number, fields)) + return overview + +def _parse_datetime(date_str, time_str=None): + """Parse a pair of (date, time) strings, and return a datetime object. + If only the date is given, it is assumed to be date and time + concatenated together (e.g. response to the DATE command). + """ + if time_str is None: + time_str = date_str[-6:] + date_str = date_str[:-6] + hours = int(time_str[:2]) + minutes = int(time_str[2:4]) + seconds = int(time_str[4:]) + year = int(date_str[:-4]) + month = int(date_str[-4:-2]) + day = int(date_str[-2:]) + # RFC 3977 doesn't say how to interpret 2-char years. Assume that + # there are no dates before 1970 on Usenet. + if year < 70: + year += 2000 + elif year < 100: + year += 1900 + return datetime.datetime(year, month, day, hours, minutes, seconds) + +def _unparse_datetime(dt, legacy=False): + """Format a date or datetime object as a pair of (date, time) strings + in the format required by the NEWNEWS and NEWGROUPS commands. If a + date object is passed, the time is assumed to be midnight (00h00). + + The returned representation depends on the legacy flag: + * if legacy is False (the default): + date has the YYYYMMDD format and time the HHMMSS format + * if legacy is True: + date has the YYMMDD format and time the HHMMSS format. + RFC 3977 compliant servers should understand both formats; therefore, + legacy is only needed when talking to old servers. + """ + if not isinstance(dt, datetime.datetime): + time_str = "000000" + else: + time_str = "{0.hour:02d}{0.minute:02d}{0.second:02d}".format(dt) + y = dt.year + if legacy: + y = y % 100 + date_str = "{0:02d}{1.month:02d}{1.day:02d}".format(y, dt) + else: + date_str = "{0:04d}{1.month:02d}{1.day:02d}".format(y, dt) + return date_str, time_str + + +# The classes themselves +class _NNTPBase: + # UTF-8 is the character set for all NNTP commands and responses: they + # are automatically encoded (when sending) and decoded (and receiving) + # by this class. + # However, some multi-line data blocks can contain arbitrary bytes (for + # example, latin-1 or utf-16 data in the body of a message). Commands + # taking (POST, IHAVE) or returning (HEAD, BODY, ARTICLE) raw message + # data will therefore only accept and produce bytes objects. + # Furthermore, since there could be non-compliant servers out there, + # we use 'surrogateescape' as the error handler for fault tolerance + # and easy round-tripping. This could be useful for some applications + # (e.g. NNTP gateways). + + encoding = 'utf-8' + errors = 'surrogateescape' + + def __init__(self, file, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): """Initialize an instance. Arguments: - - host: hostname to connect to - - port: port to connect to (default the standard NNTP port) + - file: file-like object (open for read/write in binary mode) - user: username to authenticate with - password: password to use with username - readermode: if true, send 'mode reader' command after connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections readermode is sometimes necessary if you are connecting to an NNTP server on the local machine and intend to call @@ -107,12 +292,9 @@ class NNTP: unexpected NNTPPermanentErrors, you might need to set readermode. """ - self.host = host - self.port = port - self.sock = socket.create_connection((host, port)) - self.file = self.sock.makefile('rb') + self.file = file self.debugging = 0 - self.welcome = self.getresp() + self.welcome = self._getresp() # 'mode reader' is sometimes necessary to enable 'reader' mode. # However, the order in which 'mode reader' and 'authinfo' need to @@ -122,12 +304,12 @@ class NNTP: readermode_afterauth = 0 if readermode: try: - self.welcome = self.shortcmd('mode reader') + self.welcome = self._shortcmd('mode reader') except NNTPPermanentError: # error 500, probably 'not implemented' pass except NNTPTemporaryError as e: - if user and e.response.startswith(b'480'): + if user and e.response.startswith('480'): # Need authorization before 'mode reader' readermode_afterauth = 1 else: @@ -144,29 +326,35 @@ class NNTP: password = auth[2] except IOError: pass - # Perform NNRP authentication if needed. + # Perform NNTP authentication if needed. if user: - resp = self.shortcmd('authinfo user '+user) - if resp.startswith(b'381'): + resp = self._shortcmd('authinfo user '+user) + if resp.startswith('381'): if not password: raise NNTPReplyError(resp) else: - resp = self.shortcmd( + resp = self._shortcmd( 'authinfo pass '+password) - if not resp.startswith(b'281'): + if not resp.startswith('281'): raise NNTPPermanentError(resp) if readermode_afterauth: try: - self.welcome = self.shortcmd('mode reader') + self.welcome = self._shortcmd('mode reader') except NNTPPermanentError: # error 500, probably 'not implemented' pass - - # Get the welcome message from the server - # (this is read and squirreled away by __init__()). - # If the response code is 200, posting is allowed; - # if it 201, posting is not allowed + # Inquire about capabilities (RFC 3977) + self.nntp_version = 1 + try: + resp, caps = self.capabilities() + except NNTPPermanentError: + # Server doesn't support capabilities + self._caps = {} + else: + self._caps = caps + if 'VERSION' in caps: + self.nntp_version = int(caps['VERSION'][0]) def getwelcome(self): """Get the welcome message from the server @@ -177,6 +365,12 @@ class NNTP: if self.debugging: print('*welcome*', repr(self.welcome)) return self.welcome + def getcapabilities(self): + """Get the server capabilities, as read by __init__(). + If the CAPABILITIES command is not supported, an empty dict is + returned.""" + return self._caps + def set_debuglevel(self, level): """Set the debugging level. Argument 'level' means: 0: no debugging output (default) @@ -186,121 +380,226 @@ class NNTP: self.debugging = level debug = set_debuglevel - def putline(self, line): - """Internal: send one line to the server, appending CRLF.""" - line = line + CRLF + def _putline(self, line): + """Internal: send one line to the server, appending CRLF. + The `line` must be a bytes-like object.""" + line = line + _CRLF if self.debugging > 1: print('*put*', repr(line)) - self.sock.sendall(line) + self.file.write(line) + self.file.flush() - def putcmd(self, line): - """Internal: send one command to the server (through putline()).""" + def _putcmd(self, line): + """Internal: send one command to the server (through _putline()). + The `line` must be an unicode string.""" if self.debugging: print('*cmd*', repr(line)) - line = bytes(line, "ASCII") - self.putline(line) + line = line.encode(self.encoding, self.errors) + self._putline(line) - def getline(self): - """Internal: return one line from the server, stripping CRLF. - Raise EOFError if the connection is closed.""" + def _getline(self, strip_crlf=True): + """Internal: return one line from the server, stripping _CRLF. + Raise EOFError if the connection is closed. + Returns a bytes object.""" line = self.file.readline() if self.debugging > 1: print('*get*', repr(line)) if not line: raise EOFError - if line[-2:] == CRLF: - line = line[:-2] - elif line[-1:] in CRLF: - line = line[:-1] + if strip_crlf: + if line[-2:] == _CRLF: + line = line[:-2] + elif line[-1:] in _CRLF: + line = line[:-1] return line - def getresp(self): + def _getresp(self): """Internal: get a response from the server. - Raise various errors if the response indicates an error.""" - resp = self.getline() + Raise various errors if the response indicates an error. + Returns an unicode string.""" + resp = self._getline() if self.debugging: print('*resp*', repr(resp)) + resp = resp.decode(self.encoding, self.errors) c = resp[:1] - if c == b'4': + if c == '4': raise NNTPTemporaryError(resp) - if c == b'5': + if c == '5': raise NNTPPermanentError(resp) - if c not in b'123': + if c not in '123': raise NNTPProtocolError(resp) return resp - def getlongresp(self, file=None): + def _getlongresp(self, file=None): """Internal: get a response plus following text from the server. - Raise various errors if the response indicates an error.""" + Raise various errors if the response indicates an error. + + Returns a (response, lines) tuple where `response` is an unicode + string and `lines` is a list of bytes objects. + If `file` is a file-like object, it must be open in binary mode. + """ openedFile = None try: # If a string was passed then open a file with that name - if isinstance(file, str): - openedFile = file = open(file, "w") + if isinstance(file, (str, bytes)): + openedFile = file = open(file, "wb") - resp = self.getresp() - if resp[:3] not in LONGRESP: + resp = self._getresp() + if resp[:3] not in _LONGRESP: raise NNTPReplyError(resp) - list = [] - while 1: - line = self.getline() - if line == b'.': - break - if line.startswith(b'..'): - line = line[1:] - if file: - file.write(line + b'\n') - else: - list.append(line) + + lines = [] + if file is not None: + # XXX lines = None instead? + terminators = (b'.' + _CRLF, b'.\n') + while 1: + line = self._getline(False) + if line in terminators: + break + # XXX if _getline() reached EOF in the previous iteration, + # we are in the middle of an existing line. + if line.startswith(b'..'): + line = line[1:] + file.write(line) + else: + terminator = b'.' + while 1: + # XXX if _getline() reaches EOF and returns an unterminated + # line, but data arrives before the next iteration, the + # line will be cut in two as a result. + line = self._getline() + if line == terminator: + break + if line.startswith(b'..'): + line = line[1:] + lines.append(line) finally: # If this method created the file, then it must close it if openedFile: openedFile.close() - return resp, list + return resp, lines - def shortcmd(self, line): - """Internal: send a command and get the response.""" - self.putcmd(line) - return self.getresp() + def _shortcmd(self, line): + """Internal: send a command and get the response. + Same return value as _getresp().""" + self._putcmd(line) + return self._getresp() - def longcmd(self, line, file=None): - """Internal: send a command and get the response plus following text.""" - self.putcmd(line) - return self.getlongresp(file) + def _longcmd(self, line, file=None): + """Internal: send a command and get the response plus following text. + Same return value as _getlongresp().""" + self._putcmd(line) + return self._getlongresp(file) - def newgroups(self, date, time, file=None): - """Process a NEWGROUPS command. Arguments: - - date: string 'yymmdd' indicating the date - - time: string 'hhmmss' indicating the time + def _longcmdstring(self, line, file=None): + """Internal: send a command and get the response plus following text. + Same as _longcmd() and _getlongresp(), except that the returned `lines` + are unicode strings rather than bytes objects. + """ + self._putcmd(line) + resp, list = self._getlongresp(file) + return resp, [line.decode(self.encoding, self.errors) + for line in list] + + def _getoverviewfmt(self): + """Internal: get the overview format. Queries the server if not + already done, else returns the cached value.""" + try: + return self._cachedoverviewfmt + except AttributeError: + pass + try: + resp, lines = self._longcmdstring("LIST OVERVIEW.FMT") + except NNTPPermanentError: + # Not supported by server? + fmt = _DEFAULT_OVERVIEW_FMT[:] + else: + fmt = _parse_overview_fmt(lines) + self._cachedoverviewfmt = fmt + return fmt + + def _grouplist(self, lines): + # Parse lines into "group last first flag" + return [GroupInfo(*line.split()) for line in lines] + + def capabilities(self): + """Process a CAPABILITIES command. Not supported by all servers. Return: - resp: server response if successful - - list: list of newsgroup names""" + - caps: a dictionary mapping capability names to lists of tokens + (for example {'VERSION': ['2'], 'OVER': [], LIST: ['ACTIVE', 'HEADERS'] }) + """ + caps = {} + resp, lines = self._longcmdstring("CAPABILITIES") + for line in lines: + name, *tokens = line.split() + caps[name] = tokens + return resp, caps - return self.longcmd('NEWGROUPS ' + date + ' ' + time, file) + def newgroups(self, date, *, file=None): + """Process a NEWGROUPS command. Arguments: + - date: a date or datetime object + Return: + - resp: server response if successful + - list: list of newsgroup names + """ + if not isinstance(date, (datetime.date, datetime.date)): + raise TypeError( + "the date parameter must be a date or datetime object, " + "not '{:40}'".format(date.__class__.__name__)) + date_str, time_str = _unparse_datetime(date, self.nntp_version < 2) + cmd = 'NEWGROUPS {0} {1}'.format(date_str, time_str) + resp, lines = self._longcmdstring(cmd, file) + return resp, self._grouplist(lines) - def newnews(self, group, date, time, file=None): + def newnews(self, group, date, *, file=None): """Process a NEWNEWS command. Arguments: - group: group name or '*' - - date: string 'yymmdd' indicating the date - - time: string 'hhmmss' indicating the time + - date: a date or datetime object Return: - resp: server response if successful - - list: list of message ids""" + - list: list of message ids + """ + if not isinstance(date, (datetime.date, datetime.date)): + raise TypeError( + "the date parameter must be a date or datetime object, " + "not '{:40}'".format(date.__class__.__name__)) + date_str, time_str = _unparse_datetime(date, self.nntp_version < 2) + cmd = 'NEWNEWS {0} {1} {2}'.format(group, date_str, time_str) + return self._longcmdstring(cmd, file) - cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time - return self.longcmd(cmd, file) + def list(self, *, file=None): + """Process a LIST command. Argument: + - file: Filename string or file object to store the result in + Returns: + - resp: server response if successful + - list: list of (group, last, first, flag) (strings) + """ + resp, lines = self._longcmdstring('LIST', file) + return resp, self._grouplist(lines) - def list(self, file=None): - """Process a LIST command. Return: - - resp: server response if successful - - list: list of (group, last, first, flag) (strings)""" - - resp, list = self.longcmd('LIST', file) - for i in range(len(list)): - # Parse lines into "group last first flag" - list[i] = tuple(list[i].split()) - return resp, list + def _getdescriptions(self, group_pattern, return_all): + line_pat = re.compile('^(?P[^ \t]+)[ \t]+(.*)$') + # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first + resp, lines = self._longcmdstring('LIST NEWSGROUPS ' + group_pattern) + if not resp.startswith('215'): + # Now the deprecated XGTITLE. This either raises an error + # or succeeds with the same output structure as LIST + # NEWSGROUPS. + resp, lines = self._longcmdstring('XGTITLE ' + group_pattern) + groups = {} + for raw_line in lines: + match = line_pat.search(raw_line.strip()) + if match: + name, desc = match.group(1, 2) + if not return_all: + return desc + groups[name] = desc + if return_all: + return resp, groups + else: + # Nothing found + return '' def description(self, group): - """Get a description for a single group. If more than one group matches ('group' is a pattern), return the first. If no group matches, return an empty string. @@ -311,29 +610,11 @@ class NNTP: NOTE: This neither checks for a wildcard in 'group' nor does it check whether the group actually exists.""" - - resp, lines = self.descriptions(group) - if len(lines) == 0: - return b'' - else: - return lines[0][1] + return self._getdescriptions(group, False) def descriptions(self, group_pattern): """Get descriptions for a range of groups.""" - line_pat = re.compile(b'^(?P[^ \t]+)[ \t]+(.*)$') - # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first - resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern) - if not resp.startswith(b'215'): - # Now the deprecated XGTITLE. This either raises an error - # or succeeds with the same output structure as LIST - # NEWSGROUPS. - resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern) - lines = [] - for raw_line in raw_lines: - match = line_pat.search(raw_line.strip()) - if match: - lines.append(match.group(1, 2)) - return resp, lines + return self._getdescriptions(group_pattern, True) def group(self, name): """Process a GROUP command. Argument: @@ -343,10 +624,10 @@ class NNTP: - count: number of articles (string) - first: first article number (string) - last: last article number (string) - - name: the group name""" - - resp = self.shortcmd('GROUP ' + name) - if not resp.startswith(b'211'): + - name: the group name + """ + resp = self._shortcmd('GROUP ' + name) + if not resp.startswith('211'): raise NNTPReplyError(resp) words = resp.split() count = first = last = 0 @@ -361,20 +642,23 @@ class NNTP: name = words[4].lower() return resp, count, first, last, name - def help(self, file=None): - """Process a HELP command. Returns: + def help(self, *, file=None): + """Process a HELP command. Argument: + - file: Filename string or file object to store the result in + Returns: - resp: server response if successful - - list: list of strings""" - - return self.longcmd('HELP',file) + - list: list of strings returned by the server in response to the + HELP command + """ + return self._longcmdstring('HELP', file) def statparse(self, resp): """Internal: parse the response of a STAT, NEXT or LAST command.""" - if not resp.startswith(b'22'): + if not resp.startswith('22'): raise NNTPReplyError(resp) words = resp.split() nr = 0 - id = b'' + id = '' n = len(words) if n > 1: nr = words[1] @@ -384,7 +668,7 @@ class NNTP: def statcmd(self, line): """Internal: process a STAT, NEXT or LAST command.""" - resp = self.shortcmd(line) + resp = self._shortcmd(line) return self.statparse(resp) def stat(self, id): @@ -405,11 +689,11 @@ class NNTP: """Process a LAST command. No arguments. Return as for STAT.""" return self.statcmd('LAST') - def artcmd(self, line, file=None): + def artcmd(self, line, *, file=None): """Internal: process a HEAD, BODY or ARTICLE command.""" - resp, list = self.longcmd(line, file) + resp, list = self._longcmd(line, file) resp, nr, id = self.statparse(resp) - return resp, nr, id, list + return ArticleInfo(resp, nr, id, list) def head(self, id): """Process a HEAD command. Argument: @@ -422,7 +706,7 @@ class NNTP: return self.artcmd('HEAD {0}'.format(id)) - def body(self, id, file=None): + def body(self, id, *, file=None): """Process a BODY command. Argument: - id: article number or message id - file: Filename string or file object to store the article in @@ -431,8 +715,8 @@ class NNTP: - nr: article number - id: message id - list: the lines of the article's body or an empty list - if file was used""" - + if file was used + """ return self.artcmd('BODY {0}'.format(id), file) def article(self, id): @@ -442,68 +726,57 @@ class NNTP: - resp: server response if successful - nr: article number - id: message id - - list: the lines of the article""" - + - list: the lines of the article + """ return self.artcmd('ARTICLE {0}'.format(id)) def slave(self): """Process a SLAVE command. Returns: - - resp: server response if successful""" + - resp: server response if successful + """ + return self._shortcmd('SLAVE') - return self.shortcmd('SLAVE') - - def xhdr(self, hdr, str, file=None): + def xhdr(self, hdr, str, *, file=None): """Process an XHDR command (optional server extension). Arguments: - hdr: the header type (e.g. 'subject') - str: an article nr, a message id, or a range nr1-nr2 + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (nr, value) strings""" + - list: list of (nr, value) strings + """ + pat = re.compile('^([0-9]+) ?(.*)\n?') + resp, lines = self._longcmdstring('XHDR {0} {1}'.format(hdr, str), file) + def remove_number(line): + m = pat.match(line) + return m.group(1, 2) if m else line + return resp, [remove_number(line) for line in lines] - pat = re.compile(b'^([0-9]+) ?(.*)\n?') - resp, lines = self.longcmd('XHDR {0} {1}'.format(hdr, str), file) - for i in range(len(lines)): - line = lines[i] - m = pat.match(line) - if m: - lines[i] = m.group(1, 2) - return resp, lines - - def xover(self, start, end, file=None): + def xover(self, start, end, *, file=None): """Process an XOVER command (optional server extension) Arguments: - start: start of range - end: end of range + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (art-nr, subject, poster, date, - id, references, size, lines)""" + - list: list of dicts containing the response fields + """ + resp, lines = self._longcmdstring('XOVER {0}-{1}'.format(start, end), + file) + fmt = self._getoverviewfmt() + return resp, _parse_overview(lines, fmt) - resp, lines = self.longcmd('XOVER {0}-{1}'.format(start, end), file) - xover_lines = [] - for line in lines: - elem = line.split(b'\t') - try: - xover_lines.append((elem[0], - elem[1], - elem[2], - elem[3], - elem[4], - elem[5].split(), - elem[6], - elem[7])) - except IndexError: - raise NNTPDataError(line) - return resp,xover_lines - - def xgtitle(self, group, file=None): + def xgtitle(self, group, *, file=None): """Process an XGTITLE command (optional server extension) Arguments: - group: group name wildcard (i.e. news.*) Returns: - resp: server response if successful - list: list of (name,title) strings""" - - line_pat = re.compile(b'^([^ \t]+)[ \t]+(.*)$') - resp, raw_lines = self.longcmd('XGTITLE ' + group, file) + from warnings import warn + warn("The XGTITLE extension is not actively used, use the description()" + "function instead", PendingDeprecationWarning, 2) + line_pat = re.compile('^([^ \t]+)[ \t]+(.*)$') + resp, raw_lines = self._longcmdstring('XGTITLE ' + group, file) lines = [] for raw_line in raw_lines: match = line_pat.search(raw_line.strip()) @@ -511,15 +784,19 @@ class NNTP: lines.append(match.group(1, 2)) return resp, lines - def xpath(self,id): + def xpath(self, id): """Process an XPATH command (optional server extension) Arguments: - id: Message id of article Returns: resp: server response if successful - path: directory path to article""" + path: directory path to article + """ + from warnings import warn + warn("The XPATH extension is not actively used", + PendingDeprecationWarning, 2) - resp = self.shortcmd('XPATH {0}'.format(id)) - if not resp.startswith(b'223'): + resp = self._shortcmd('XPATH {0}'.format(id)) + if not resp.startswith('223'): raise NNTPReplyError(resp) try: [resp_num, path] = resp.split() @@ -528,30 +805,27 @@ class NNTP: else: return resp, path - def date (self): - """Process the DATE command. Arguments: - None + def date(self): + """Process the DATE command. Returns: - resp: server response if successful - date: Date suitable for newnews/newgroups commands etc. - time: Time suitable for newnews/newgroups commands etc.""" - - resp = self.shortcmd("DATE") - if not resp.startswith(b'111'): + - resp: server response if successful + - date: datetime object + """ + resp = self._shortcmd("DATE") + if not resp.startswith('111'): raise NNTPReplyError(resp) elem = resp.split() if len(elem) != 2: raise NNTPDataError(resp) - date = elem[1][2:8] - time = elem[1][-6:] - if len(date) != 6 or len(time) != 6: + date = elem[1] + if len(date) != 14: raise NNTPDataError(resp) - return resp, date, time + return resp, _parse_datetime(date, None) def _post(self, command, f): - resp = self.shortcmd(command) - # Raises error_??? if posting is not allowed - if not resp.startswith(b'3'): + resp = self._shortcmd(command) + # Raises a specific exception if posting is not allowed + if not resp.startswith('3'): raise NNTPReplyError(resp) while 1: line = f.readline() @@ -561,9 +835,9 @@ class NNTP: line = line[:-1] if line.startswith(b'.'): line = b'.' + line - self.putline(line) - self.putline(b'.') - return self.getresp() + self._putline(line) + self._putline(b'.') + return self._getresp() def post(self, f): """Process a POST command. Arguments: @@ -581,36 +855,90 @@ class NNTP: Note that if the server refuses the article an exception is raised.""" return self._post('IHAVE {0}'.format(id), f) + def _close(self): + self.file.close() + del self.file + def quit(self): """Process a QUIT command and close the socket. Returns: - resp: server response if successful""" - - resp = self.shortcmd('QUIT') - self.file.close() - self.sock.close() - del self.file, self.sock + try: + resp = self._shortcmd('QUIT') + finally: + self._close() return resp +class NNTP(_NNTPBase): + + def __init__(self, host, port=NNTP_PORT, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): + """Initialize an instance. Arguments: + - host: hostname to connect to + - port: port to connect to (default the standard NNTP port) + - user: username to authenticate with + - password: password to use with username + - readermode: if true, send 'mode reader' command after + connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections + + readermode is sometimes necessary if you are connecting to an + NNTP server on the local machine and intend to call + reader-specific comamnds, such as `group'. If you get + unexpected NNTPPermanentErrors, you might need to set + readermode. + """ + self.host = host + self.port = port + self.sock = socket.create_connection((host, port), timeout) + file = self.sock.makefile("rwb") + _NNTPBase.__init__(self, file, user, password, + readermode, usenetrc, timeout) + + def _close(self): + try: + _NNTPBase._close(self) + finally: + self.sock.close() + + # Test retrieval when run as a script. -# Assumption: if there's a local news server, it's called 'news'. -# Assumption: if user queries a remote news server, it's named -# in the environment variable NNTPSERVER (used by slrn and kin) -# and we want readermode off. if __name__ == '__main__': - import os - newshost = 'news' and os.environ["NNTPSERVER"] - if newshost.find('.') == -1: - mode = 'readermode' - else: - mode = None - s = NNTP(newshost, readermode=mode) - resp, count, first, last, name = s.group('comp.lang.python') - print(resp) + import argparse + from email.utils import parsedate + + parser = argparse.ArgumentParser(description="""\ + nntplib built-in demo - display the latest articles in a newsgroup""") + parser.add_argument('-g', '--group', default='gmane.comp.python.general', + help='group to fetch messages from (default: %(default)s)') + parser.add_argument('-s', '--server', default='news.gmane.org', + help='NNTP server hostname (default: %(default)s)') + parser.add_argument('-p', '--port', default=NNTP_PORT, type=int, + help='NNTP port number (default: %(default)s)') + parser.add_argument('-n', '--nb-articles', default=10, type=int, + help='number of articles to fetch (default: %(default)s)') + args = parser.parse_args() + + s = NNTP(host=args.server, port=args.port) + resp, count, first, last, name = s.group(args.group) print('Group', name, 'has', count, 'articles, range', first, 'to', last) - resp, subs = s.xhdr('subject', '{0}-{1}'.format(first, last)) - print(resp) - for item in subs: - print("%7s %s" % item) - resp = s.quit() - print(resp) + + def cut(s, lim): + if len(s) > lim: + s = s[:lim - 4] + "..." + return s + + first = str(int(last) - args.nb_articles + 1) + resp, overviews = s.xover(first, last) + for artnum, over in overviews: + author = decode_header(over['from']).split('<', 1)[0] + subject = decode_header(over['subject']) + lines = int(over[':lines']) + print("{:7} {:20} {:42} ({})".format( + artnum, cut(author, 20), cut(subject, 42), lines) + ) + + s.quit() diff -r 4ea5a13762e5 Lib/test/test_nntplib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/test_nntplib.py Wed Sep 15 23:14:39 2010 +0200 @@ -0,0 +1,584 @@ +import io +import datetime +import textwrap +import unittest +import contextlib +from test import support +from nntplib import NNTP, GroupInfo +import nntplib + +TIMEOUT = 10 + +# TODO: +# - test the `file` arg to various commands +# - test error conditions +# - test XOVER/OVER incl. with non-ascii headers +# - test POST, IHAVE +# - test ARTICLE, HEAD, BODY + + +class NetworkedNNTPTestsMixin: + + def test_welcome(self): + welcome = self.server.getwelcome() + self.assertEqual(str, type(welcome)) + + def test_help(self): + resp, list = self.server.help() + self.assertTrue(resp.startswith("100 "), resp) + for line in list: + self.assertEqual(str, type(line)) + + def test_list(self): + resp, list = self.server.list() + if len(list) > 0: + self.assertEqual(GroupInfo, type(list[0])) + self.assertEqual(str, type(list[0].group)) + + def test_unknown_command(self): + with self.assertRaises(nntplib.NNTPPermanentError) as cm: + self.server._shortcmd("XYZZY") + resp = cm.exception.response + self.assertTrue(resp.startswith("500 "), resp) + + def test_newgroups(self): + # gmane gets a constant influx of new groups. In order not to stress + # the server too much, we choose a recent date in the past. + dt = datetime.date.today() - datetime.timedelta(days=7) + resp, groups = self.server.newgroups(dt) + if len(groups) > 0: + self.assertIsInstance(groups[0], GroupInfo) + self.assertIsInstance(groups[0].group, str) + + def test_description(self): + def _check_desc(desc): + # Sanity checks + self.assertIsInstance(desc, str) + self.assertNotIn(self.GROUP_NAME, desc) + desc = self.server.description(self.GROUP_NAME) + _check_desc(desc) + # Another sanity check + self.assertIn("Python", desc) + # With a pattern + desc = self.server.description(self.GROUP_PAT) + _check_desc(desc) + # Shouldn't exist + desc = self.server.description("zk.brrtt.baz") + self.assertEqual(desc, '') + + def test_descriptions(self): + resp, descs = self.server.descriptions(self.GROUP_PAT) + # 215 for LIST NEWSGROUPS, 282 for XGTITLE + self.assertTrue( + resp.startswith("215 ") or resp.startswith("282 "), resp) + self.assertIsInstance(descs, dict) + desc = descs[self.GROUP_NAME] + self.assertEqual(desc, self.server.description(self.GROUP_NAME)) + + def test_group(self): + result = self.server.group(self.GROUP_NAME) + self.assertEqual(5, len(result)) + self.assertEqual(str, type(result[4])) + + def test_date(self): + resp, date = self.server.date() + self.assertIsInstance(date, datetime.datetime) + # Sanity check + self.assertGreaterEqual(date.year, 1995) + self.assertLessEqual(date.year, 2030) + + def test_head(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + result = self.server.head(last) + for line in result.lines: + self.assertEqual(bytes, type(line)) + + def test_xover(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xover(last, last) + art_num, d = lines[0] + self.assertEqual(art_num, last) + self.assertIsInstance(d, dict) + # NNTP has 7 mandatory fields for OVER/XOVER + self.assertGreaterEqual(len(d), 7) + for v in d.values(): + self.assertIsInstance(v, str) + + def test_xhdr(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xhdr('subject', last) + for line in lines: + self.assertEqual(str, type(line[1])) + + def test_quit(self): + self.server.quit() + self.server = None + + +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): + NNTP_HOST = 'news.gmane.org' + GROUP_NAME = 'gmane.comp.python.devel' + GROUP_PAT = 'gmane.comp.python.d*' + + def setUp(self): + support.requires("network") + with support.transient_internet(self.NNTP_HOST): + self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT) + + def tearDown(self): + if self.server is not None: + self.server.quit() + + # Disabled with gmane as it produces too much data + test_list = None + + def test_capabilities(self): + # As of this writing, gmane implements NNTP version 2 and has a + # couple of well-known capabilities. Just sanity check that we + # got them. + def _check_caps(caps): + caps_list = caps['LIST'] + self.assertIsInstance(caps_list, (list, tuple)) + self.assertIn('OVERVIEW.FMT', caps_list) + self.assertGreaterEqual(self.server.nntp_version, 2) + _check_caps(self.server.getcapabilities()) + # This re-emits the command + resp, caps = self.server.capabilities() + _check_caps(caps) + + +# +# Non-networked tests using a local server (or something mocking it). +# + +class _NNTPServerIO(io.RawIOBase): + """A raw IO object allowing NNTP commands to be received and processed + by a handler. The handler can push responses which can then be read + from the IO object.""" + + def __init__(self, handler): + io.RawIOBase.__init__(self) + self.handler = handler + # The channel from the client + self.c2s = io.BytesIO() + # The channel to the client + self.s2c = io.BytesIO() + # Our welcome + self.handler.handle_welcome(self) + + def readable(self): + return True + + def writable(self): + return True + + def _decode(self, data): + return str(data, "utf-8", "surrogateescape") + + def _process_pending(self): + while True: + line = self._decode(self.c2s.readline()) + if not line: + return + if not line.endswith("\r\n"): + raise ValueError("line doesn't end with \\r\\n: %r" % line) + line = line[:-2] + cmd, *tokens = line.split() + meth = getattr(self.handler, "handle_" + cmd.upper(), None) + if meth is None: + self.handler.handle_unknown(self) + else: + try: + meth(self, *tokens) + except Exception as e: + raise ValueError("command failed: %r" % line) from e + + def push_data(self, data): + """Push (buffer) some data to send to the client.""" + pos = self.s2c.tell() + self.s2c.write(data) + self.s2c.seek(pos) + + def push_lit(self, lit): + """Push a string literal""" + lit = textwrap.dedent(lit) + lit = "\r\n".join(lit.splitlines()) + "\r\n" + lit = lit.encode('utf-8') + self.push_data(lit) + + def write(self, b): + """The client sends us some data""" + pos = self.c2s.tell() + self.c2s.write(b) + self.c2s.seek(pos) + self._process_pending() + return len(b) + + def readinto(self, buf): + """The client wants to read a response""" + self._process_pending() + b = self.s2c.read(len(buf)) + n = len(b) + buf[:n] = b + return n + + +class MockedNNTPTestsMixin: + welcome = "200 NNTP mock server" + + def setUp(self): + self.make_server() + + def tearDown(self): + del self.server + + def make_server(self, *args, **kwargs): + self.sio = _NNTPServerIO(handler=self) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(self.sio, self.sio) + self.server = nntplib._NNTPBase(file, *args, **kwargs) + return self.server + + def handle_unknown(self, serv): + serv.push_lit("500 What?") + + def handle_welcome(self, serv): + serv.push_lit(self.welcome) + + def handle_QUIT(self, serv): + serv.push_lit("205 Bye!") + + def handle_DATE(self, serv): + serv.push_lit("111 20100914001155") + + def handle_HELP(self, serv): + serv.push_lit("""\ + 100 Legal commands + authinfo user Name|pass Password|generic + date + help + Report problems to + .""") + + def handle_LIST(self, serv, action=None, param=None): + if action is None: + serv.push_lit("""\ + 215 Newsgroups in form "group high low flags". + comp.lang.python 0000052340 0000002828 y + comp.lang.python.announce 0000001153 0000000993 m + free.it.comp.lang.python 0000000002 0000000002 y + fr.comp.lang.python 0000001254 0000000760 y + free.it.comp.lang.python.learner 0000000000 0000000001 y + tw.bbs.comp.lang.python 0000000304 0000000304 y + .""") + elif action == "NEWSGROUPS": + assert param is not None + serv.push_lit('215 Descriptions in form "group description".') + if param == "comp.lang.python": + serv.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + .""") + elif param == "comp.lang.python*": + serv.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) + .""") + else: + serv.push_lit("""\ + 215 Descriptions in form "group description". + .""") + else: + serv.push_list('501 Unknown LIST keyword') + + def handle_NEWNEWS(self, serv, group, date_str, time_str): + # We hard code different return messages depending on passed + # argument and date syntax. + if (group == "comp.lang.python" and date_str == "20100913" + and time_str == "082004"): + # Date was passed in RFC 3977 format (NNTP "v2") + serv.push_lit("""\ + 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + elif (group == "comp.lang.python" and date_str == "100913" + and time_str == "082004"): + # Date was passed in RFC 977 format (NNTP "v1") + serv.push_lit("""\ + 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + else: + serv.push_lit("""\ + 230 An empty list of newsarticles follows + .""") + # (Note for experiments: many servers disable NEWNEWS. + # As of this writing, sicinfo3.epfl.ch doesn't.) + + +class NNTPv1v2TestsMixin: + + def test_welcome(self): + self.assertEqual(self.server.welcome, self.welcome) + + def test_date(self): + resp, date = self.server.date() + self.assertEqual(resp, "111 20100914001155") + self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) + + def test_quit(self): + self.assertFalse(self.sio.closed) + resp = self.server.quit() + self.assertEqual(resp, "205 Bye!") + self.assertTrue(self.sio.closed) + + def test_help(self): + resp, help = self.server.help() + self.assertEqual(resp, "100 Legal commands") + self.assertEqual(help, [ + ' authinfo user Name|pass Password|generic ', + ' date', + ' help', + 'Report problems to ', + ]) + + def test_list(self): + resp, groups = self.server.list() + self.assertEqual(len(groups), 6) + g = groups[1] + self.assertEqual(g, + GroupInfo("comp.lang.python.announce", "0000001153", + "0000000993", "m")) + + def test_description(self): + desc = self.server.description("comp.lang.python") + self.assertEqual(desc, "The Python computer language.") + desc = self.server.description("comp.lang.pythonx") + self.assertEqual(desc, "") + + def test_descriptions(self): + resp, groups = self.server.descriptions("comp.lang.python") + self.assertEqual(resp, '215 Descriptions in form "group description".') + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + }) + resp, groups = self.server.descriptions("comp.lang.python*") + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", + }) + resp, groups = self.server.descriptions("comp.lang.pythonx") + self.assertEqual(groups, {}) + + def test_newnews(self): + # NEWNEWS comp.lang.python [20]100913 082004 + dt = datetime.datetime(2010, 9, 13, 8, 20, 4) + resp, ids = self.server.newnews("comp.lang.python", dt) + expected = ( + "230 list of newsarticles (NNTP v{0}) " + "created after Mon Sep 13 08:20:04 2010 follows" + ).format(self.nntp_version) + self.assertEqual(resp, expected) + self.assertEqual(ids, [ + "", + "", + ]) + # NEWNEWS fr.comp.lang.python [20]100913 082004 + dt = datetime.datetime(2010, 9, 13, 8, 20, 4) + resp, ids = self.server.newnews("fr.comp.lang.python", dt) + self.assertEqual(resp, "230 An empty list of newsarticles follows") + self.assertEqual(ids, []) + + +class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v1 server (no capabilities).""" + + nntp_version = 1 + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, {}) + self.assertEqual(self.server.nntp_version, 1) + + +class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v2 server (with capabilities).""" + + nntp_version = 2 + + def handle_CAPABILITIES(self, serv): + serv.push_lit("""\ + 101 Capability list: + VERSION 2 + IMPLEMENTATION INN 2.5.1 + AUTHINFO USER + HDR + LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT + OVER + POST + READER + .""") + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, { + 'VERSION': ['2'], + 'IMPLEMENTATION': ['INN', '2.5.1'], + 'AUTHINFO': ['USER'], + 'HDR': [], + 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', + 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], + 'OVER': [], + 'POST': [], + 'READER': [], + }) + self.assertEqual(self.server.nntp_version, 2) + + +class MiscTests(unittest.TestCase): + + def test_decode_header(self): + def gives(a, b): + self.assertEqual(nntplib.decode_header(a), b) + gives("" , "") + gives("a plain header", "a plain header") + gives(" with extra spaces ", " with extra spaces ") + gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") + gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" + " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", + "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") + gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", + "Re: problème de matrice") + # A natively utf-8 header (found in the real world!) + gives("Re: Message d'erreur incompréhensible (par moi)", + "Re: Message d'erreur incompréhensible (par moi)") + + def test_parse_overview_fmt(self): + # The minimal (default) response + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # The minimal response using alternative names + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # Variations in casing + lines = ["subject:", "FROM:", "DaTe:", "message-ID:", + "References:", "BYTES:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # First example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines", "Xref:full", + "Distribution:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # Second example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:FULL", + "Distribution:FULL"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # A classic response from INN + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref"]) + + def test_parse_overview(self): + fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] + # First example from RFC 3977 + lines = [ + '3000234\tI am just a test article\t"Demo User" ' + '\t6 Oct 1998 04:38:40 -0500\t' + '<45223423@example.com>\t<45454@example.net>\t1234\t' + '17\tXref: news.example.com misc.test:3000363', + ] + overview = nntplib._parse_overview(lines, fmt) + (art_num, fields), = overview + self.assertEqual(art_num, '3000234') + self.assertEqual(fields, { + 'subject': 'I am just a test article', + 'from': '"Demo User" ', + 'date': '6 Oct 1998 04:38:40 -0500', + 'message-id': '<45223423@example.com>', + 'references': '<45454@example.net>', + ':bytes': '1234', + ':lines': '17', + 'xref': 'news.example.com misc.test:3000363', + }) + + def test_parse_datetime(self): + def gives(a, b, *c): + self.assertEqual(nntplib._parse_datetime(a, b), + datetime.datetime(*c)) + # Output of DATE command + gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) + # Variations + gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("090623", "135624", 2009, 6, 23, 13, 56, 24) + + def test_unparse_datetime(self): + # Test non-legacy mode + # 1) with a datetime + def gives(y, M, d, h, m, s, date_str, time_str): + dt = datetime.datetime(y, M, d, h, m, s) + self.assertEqual(nntplib._unparse_datetime(dt), + (date_str, time_str)) + self.assertEqual(nntplib._unparse_datetime(dt, False), + (date_str, time_str)) + gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") + gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") + gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") + # 2) with a date + def gives(y, M, d, date_str, time_str): + dt = datetime.date(y, M, d) + self.assertEqual(nntplib._unparse_datetime(dt), + (date_str, time_str)) + self.assertEqual(nntplib._unparse_datetime(dt, False), + (date_str, time_str)) + gives(1999, 6, 23, "19990623", "000000") + gives(2000, 6, 23, "20000623", "000000") + gives(2010, 6, 5, "20100605", "000000") + + def test_unparse_datetime_legacy(self): + # Test legacy mode (RFC 977) + # 1) with a datetime + def gives(y, M, d, h, m, s, date_str, time_str): + dt = datetime.datetime(y, M, d, h, m, s) + self.assertEqual(nntplib._unparse_datetime(dt, True), + (date_str, time_str)) + gives(1999, 6, 23, 13, 56, 24, "990623", "135624") + gives(2000, 6, 23, 13, 56, 24, "000623", "135624") + gives(2010, 6, 5, 1, 2, 3, "100605", "010203") + # 2) with a date + def gives(y, M, d, date_str, time_str): + dt = datetime.date(y, M, d) + self.assertEqual(nntplib._unparse_datetime(dt, True), + (date_str, time_str)) + gives(1999, 6, 23, "990623", "000000") + gives(2000, 6, 23, "000623", "000000") + gives(2010, 6, 5, "100605", "000000") + + +def test_main(): + support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests, + NetworkedNNTPTests + ) + + +if __name__ == "__main__": + test_main()