diff -r 5cbe28291492 Lib/nntplib.py --- a/Lib/nntplib.py Fri Sep 24 15:56:34 2010 +0200 +++ b/Lib/nntplib.py Fri Sep 24 22:41:37 2010 +0200 @@ -1,4 +1,7 @@ -"""An NNTP client class based on RFC 977: Network News Transfer Protocol. +"""An NNTP client class based on: +- RFC 977: Network News Transfer Protocol +- RFC 2980: Common NNTP Extensions +- RFC 3977: Network News Transfer Protocol (version 2) Example: @@ -27,15 +30,51 @@ are strings, not numbers, since they are # RFC 977 by Brian Kantor and Phil Lapsley. # xover, xgtitle, xpath, date methods by Kevan Heydon +# Incompatible changes from the 2.x nntplib: +# - all commands are encoded as UTF-8 data (using the "surrogateescape" +# error handler), except for raw message data (POST, IHAVE) +# - all responses are decoded as UTF-8 data (using the "surrogateescape" +# error handler), except for raw message data (ARTICLE, HEAD, BODY) +# - the `file` argument to various methods is keyword-only +# +# - NNTP.date() returns a datetime object +# - NNTP.newgroups() and NNTP.newnews() take a datetime (or date) object, +# rather than a pair of (date, time) strings. +# - NNTP.newgroups() and NNTP.list() return a list of GroupInfo named tuples +# - NNTP.descriptions() returns a dict mapping group names to descriptions +# - NNTP.xover() returns a list of dicts mapping field names (header or metadata) +# to field values; each dict representing a message overview. +# - NNTP.article(), NNTP.head() and NNTP.body() return a (response, ArticleInfo) +# tuple. +# - the "internal" methods have been marked private (they now start with +# an underscore) + +# Other changes from the 2.x/3.1 nntplib: +# - automatic querying of capabilities at connect +# - New method NNTP.getcapabilities() +# - New helper function decode_header() +# - NNTP.post() and NNTP.ihave() accept file objects, bytes-like objects and +# arbitrary iterables yielding lines. +# - An extensive test suite :-) + +# TODO: +# - return structured data (GroupInfo etc.) everywhere # Imports import re import socket +import collections +import datetime +import warnings -__all__ = ["NNTP","NNTPReplyError","NNTPTemporaryError", - "NNTPPermanentError","NNTPProtocolError","NNTPDataError", - "error_reply","error_temp","error_perm","error_proto", - "error_data",] +from email.header import decode_header as _email_decode_header +from socket import _GLOBAL_DEFAULT_TIMEOUT + +__all__ = ["NNTP", + "NNTPReplyError", "NNTPTemporaryError", "NNTPPermanentError", + "NNTPProtocolError", "NNTPDataError", + "decode_header", + ] # Exceptions raised when an error or invalid response is received class NNTPError(Exception): @@ -67,39 +106,189 @@ class NNTPDataError(NNTPError): """Error in response data""" pass -# for backwards compatibility -error_reply = NNTPReplyError -error_temp = NNTPTemporaryError -error_perm = NNTPPermanentError -error_proto = NNTPProtocolError -error_data = NNTPDataError - - # Standard port used by NNTP servers NNTP_PORT = 119 # Response numbers that are followed by additional text (e.g. article) -LONGRESP = [b'100', b'215', b'220', b'221', b'222', b'224', b'230', b'231', b'282'] +_LONGRESP = { + '100', # HELP + '101', # CAPABILITIES + '211', # LISTGROUP (also not multi-line with GROUP) + '215', # LIST + '220', # ARTICLE + '221', # HEAD, XHDR + '222', # BODY + '224', # OVER, XOVER + '225', # HDR + '230', # NEWNEWS + '231', # NEWGROUPS + '282', # XGTITLE +} +# Default decoded value for LIST OVERVIEW.FMT if not supported +_DEFAULT_OVERVIEW_FMT = [ + "subject", "from", "date", "message-id", "references", ":bytes", ":lines"] + +# Alternative names allowed in LIST OVERVIEW.FMT response +_OVERVIEW_FMT_ALTERNATIVES = { + 'bytes': ':bytes', + 'lines': ':lines', +} # Line terminators (we always output CRLF, but accept any of CRLF, CR, LF) -CRLF = b'\r\n' +_CRLF = b'\r\n' +GroupInfo = collections.namedtuple('GroupInfo', + ['group', 'last', 'first', 'flag']) +ArticleInfo = collections.namedtuple('ArticleInfo', + ['number', 'message_id', 'lines']) -# The class itself -class NNTP: - def __init__(self, host, port=NNTP_PORT, user=None, password=None, - readermode=None, usenetrc=True): + +# Helper function(s) +def decode_header(header_str): + """Takes an unicode string representing a munged header value + and decodes it as a (possibly non-ASCII) readable value.""" + parts = [] + for v, enc in _email_decode_header(header_str): + if isinstance(v, bytes): + parts.append(v.decode(enc or 'ascii')) + else: + parts.append(v) + return ' '.join(parts) + +def _parse_overview_fmt(lines): + """Parse a list of string representing the response to LIST OVERVIEW.FMT + and return a list of header/metadata names. + Raises NNTPDataError if the response is not compliant + (cf. RFC 3977, section 8.4).""" + fmt = [] + for line in lines: + if line[0] == ':': + # Metadata name (e.g. ":bytes") + name, _, suffix = line[1:].partition(':') + name = ':' + name + else: + # Header name (e.g. "Subject:" or "Xref:full") + name, _, suffix = line.partition(':') + name = name.lower() + name = _OVERVIEW_FMT_ALTERNATIVES.get(name, name) + # Should we do something with the suffix? + fmt.append(name) + defaults = _DEFAULT_OVERVIEW_FMT + if len(fmt) < len(defaults): + raise NNTPDataError("LIST OVERVIEW.FMT response too short") + if fmt[:len(defaults)] != defaults: + raise NNTPDataError("LIST OVERVIEW.FMT redefines default fields") + return fmt + +def _parse_overview(lines, fmt, data_process_func=None): + """Parse the response to a OVER or XOVER command according to the + overview format `fmt`.""" + n_defaults = len(_DEFAULT_OVERVIEW_FMT) + overview = [] + for line in lines: + fields = {} + article_number, *tokens = line.split('\t') + article_number = int(article_number) + for i, token in enumerate(tokens): + if i >= len(fmt): + # XXX should we raise an error? Some servers might not + # support LIST OVERVIEW.FMT and still return additional + # headers. + continue + field_name = fmt[i] + is_metadata = field_name.startswith(':') + if i >= n_defaults and not is_metadata: + # Non-default header names are included in full in the response + h = field_name + ":" + if token[:len(h)].lower() != h: + raise NNTPDataError("OVER/XOVER response doesn't include " + "names of additional headers") + token = token[len(h):].lstrip(" ") + fields[fmt[i]] = token + overview.append((article_number, fields)) + return overview + +def _parse_datetime(date_str, time_str=None): + """Parse a pair of (date, time) strings, and return a datetime object. + If only the date is given, it is assumed to be date and time + concatenated together (e.g. response to the DATE command). + """ + if time_str is None: + time_str = date_str[-6:] + date_str = date_str[:-6] + hours = int(time_str[:2]) + minutes = int(time_str[2:4]) + seconds = int(time_str[4:]) + year = int(date_str[:-4]) + month = int(date_str[-4:-2]) + day = int(date_str[-2:]) + # RFC 3977 doesn't say how to interpret 2-char years. Assume that + # there are no dates before 1970 on Usenet. + if year < 70: + year += 2000 + elif year < 100: + year += 1900 + return datetime.datetime(year, month, day, hours, minutes, seconds) + +def _unparse_datetime(dt, legacy=False): + """Format a date or datetime object as a pair of (date, time) strings + in the format required by the NEWNEWS and NEWGROUPS commands. If a + date object is passed, the time is assumed to be midnight (00h00). + + The returned representation depends on the legacy flag: + * if legacy is False (the default): + date has the YYYYMMDD format and time the HHMMSS format + * if legacy is True: + date has the YYMMDD format and time the HHMMSS format. + RFC 3977 compliant servers should understand both formats; therefore, + legacy is only needed when talking to old servers. + """ + if not isinstance(dt, datetime.datetime): + time_str = "000000" + else: + time_str = "{0.hour:02d}{0.minute:02d}{0.second:02d}".format(dt) + y = dt.year + if legacy: + y = y % 100 + date_str = "{0:02d}{1.month:02d}{1.day:02d}".format(y, dt) + else: + date_str = "{0:04d}{1.month:02d}{1.day:02d}".format(y, dt) + return date_str, time_str + + +# The classes themselves +class _NNTPBase: + # UTF-8 is the character set for all NNTP commands and responses: they + # are automatically encoded (when sending) and decoded (and receiving) + # by this class. + # However, some multi-line data blocks can contain arbitrary bytes (for + # example, latin-1 or utf-16 data in the body of a message). Commands + # taking (POST, IHAVE) or returning (HEAD, BODY, ARTICLE) raw message + # data will therefore only accept and produce bytes objects. + # Furthermore, since there could be non-compliant servers out there, + # we use 'surrogateescape' as the error handler for fault tolerance + # and easy round-tripping. This could be useful for some applications + # (e.g. NNTP gateways). + + encoding = 'utf-8' + errors = 'surrogateescape' + + def __init__(self, file, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): """Initialize an instance. Arguments: - - host: hostname to connect to - - port: port to connect to (default the standard NNTP port) + - file: file-like object (open for read/write in binary mode) - user: username to authenticate with - password: password to use with username - readermode: if true, send 'mode reader' command after connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections readermode is sometimes necessary if you are connecting to an NNTP server on the local machine and intend to call @@ -107,12 +296,9 @@ class NNTP: unexpected NNTPPermanentErrors, you might need to set readermode. """ - self.host = host - self.port = port - self.sock = socket.create_connection((host, port)) - self.file = self.sock.makefile('rb') + self.file = file self.debugging = 0 - self.welcome = self.getresp() + self.welcome = self._getresp() # 'mode reader' is sometimes necessary to enable 'reader' mode. # However, the order in which 'mode reader' and 'authinfo' need to @@ -122,12 +308,12 @@ class NNTP: readermode_afterauth = 0 if readermode: try: - self.welcome = self.shortcmd('mode reader') + self.welcome = self._shortcmd('mode reader') except NNTPPermanentError: # error 500, probably 'not implemented' pass except NNTPTemporaryError as e: - if user and e.response.startswith(b'480'): + if user and e.response.startswith('480'): # Need authorization before 'mode reader' readermode_afterauth = 1 else: @@ -144,29 +330,35 @@ class NNTP: password = auth[2] except IOError: pass - # Perform NNRP authentication if needed. + # Perform NNTP authentication if needed. if user: - resp = self.shortcmd('authinfo user '+user) - if resp.startswith(b'381'): + resp = self._shortcmd('authinfo user '+user) + if resp.startswith('381'): if not password: raise NNTPReplyError(resp) else: - resp = self.shortcmd( + resp = self._shortcmd( 'authinfo pass '+password) - if not resp.startswith(b'281'): + if not resp.startswith('281'): raise NNTPPermanentError(resp) if readermode_afterauth: try: - self.welcome = self.shortcmd('mode reader') + self.welcome = self._shortcmd('mode reader') except NNTPPermanentError: # error 500, probably 'not implemented' pass - - # Get the welcome message from the server - # (this is read and squirreled away by __init__()). - # If the response code is 200, posting is allowed; - # if it 201, posting is not allowed + # Inquire about capabilities (RFC 3977) + self.nntp_version = 1 + try: + resp, caps = self.capabilities() + except NNTPPermanentError: + # Server doesn't support capabilities + self._caps = {} + else: + self._caps = caps + if 'VERSION' in caps: + self.nntp_version = int(caps['VERSION'][0]) def getwelcome(self): """Get the welcome message from the server @@ -177,6 +369,12 @@ class NNTP: if self.debugging: print('*welcome*', repr(self.welcome)) return self.welcome + def getcapabilities(self): + """Get the server capabilities, as read by __init__(). + If the CAPABILITIES command is not supported, an empty dict is + returned.""" + return self._caps + def set_debuglevel(self, level): """Set the debugging level. Argument 'level' means: 0: no debugging output (default) @@ -186,121 +384,221 @@ class NNTP: self.debugging = level debug = set_debuglevel - def putline(self, line): - """Internal: send one line to the server, appending CRLF.""" - line = line + CRLF + def _putline(self, line): + """Internal: send one line to the server, appending CRLF. + The `line` must be a bytes-like object.""" + line = line + _CRLF if self.debugging > 1: print('*put*', repr(line)) - self.sock.sendall(line) + self.file.write(line) + self.file.flush() - def putcmd(self, line): - """Internal: send one command to the server (through putline()).""" + def _putcmd(self, line): + """Internal: send one command to the server (through _putline()). + The `line` must be an unicode string.""" if self.debugging: print('*cmd*', repr(line)) - line = bytes(line, "ASCII") - self.putline(line) + line = line.encode(self.encoding, self.errors) + self._putline(line) - def getline(self): - """Internal: return one line from the server, stripping CRLF. - Raise EOFError if the connection is closed.""" + def _getline(self, strip_crlf=True): + """Internal: return one line from the server, stripping _CRLF. + Raise EOFError if the connection is closed. + Returns a bytes object.""" line = self.file.readline() if self.debugging > 1: print('*get*', repr(line)) if not line: raise EOFError - if line[-2:] == CRLF: - line = line[:-2] - elif line[-1:] in CRLF: - line = line[:-1] + if strip_crlf: + if line[-2:] == _CRLF: + line = line[:-2] + elif line[-1:] in _CRLF: + line = line[:-1] return line - def getresp(self): + def _getresp(self): """Internal: get a response from the server. - Raise various errors if the response indicates an error.""" - resp = self.getline() + Raise various errors if the response indicates an error. + Returns an unicode string.""" + resp = self._getline() if self.debugging: print('*resp*', repr(resp)) + resp = resp.decode(self.encoding, self.errors) c = resp[:1] - if c == b'4': + if c == '4': raise NNTPTemporaryError(resp) - if c == b'5': + if c == '5': raise NNTPPermanentError(resp) - if c not in b'123': + if c not in '123': raise NNTPProtocolError(resp) return resp - def getlongresp(self, file=None): + def _getlongresp(self, file=None): """Internal: get a response plus following text from the server. - Raise various errors if the response indicates an error.""" + Raise various errors if the response indicates an error. + + Returns a (response, lines) tuple where `response` is an unicode + string and `lines` is a list of bytes objects. + If `file` is a file-like object, it must be open in binary mode. + """ openedFile = None try: # If a string was passed then open a file with that name - if isinstance(file, str): - openedFile = file = open(file, "w") + if isinstance(file, (str, bytes)): + openedFile = file = open(file, "wb") - resp = self.getresp() - if resp[:3] not in LONGRESP: + resp = self._getresp() + if resp[:3] not in _LONGRESP: raise NNTPReplyError(resp) - list = [] - while 1: - line = self.getline() - if line == b'.': - break - if line.startswith(b'..'): - line = line[1:] - if file: - file.write(line + b'\n') - else: - list.append(line) + + lines = [] + if file is not None: + # XXX lines = None instead? + terminators = (b'.' + _CRLF, b'.\n') + while 1: + line = self._getline(False) + if line in terminators: + break + if line.startswith(b'..'): + line = line[1:] + file.write(line) + else: + terminator = b'.' + while 1: + line = self._getline() + if line == terminator: + break + if line.startswith(b'..'): + line = line[1:] + lines.append(line) finally: # If this method created the file, then it must close it if openedFile: openedFile.close() - return resp, list + return resp, lines - def shortcmd(self, line): - """Internal: send a command and get the response.""" - self.putcmd(line) - return self.getresp() + def _shortcmd(self, line): + """Internal: send a command and get the response. + Same return value as _getresp().""" + self._putcmd(line) + return self._getresp() - def longcmd(self, line, file=None): - """Internal: send a command and get the response plus following text.""" - self.putcmd(line) - return self.getlongresp(file) + def _longcmd(self, line, file=None): + """Internal: send a command and get the response plus following text. + Same return value as _getlongresp().""" + self._putcmd(line) + return self._getlongresp(file) - def newgroups(self, date, time, file=None): - """Process a NEWGROUPS command. Arguments: - - date: string 'yymmdd' indicating the date - - time: string 'hhmmss' indicating the time + def _longcmdstring(self, line, file=None): + """Internal: send a command and get the response plus following text. + Same as _longcmd() and _getlongresp(), except that the returned `lines` + are unicode strings rather than bytes objects. + """ + self._putcmd(line) + resp, list = self._getlongresp(file) + return resp, [line.decode(self.encoding, self.errors) + for line in list] + + def _getoverviewfmt(self): + """Internal: get the overview format. Queries the server if not + already done, else returns the cached value.""" + try: + return self._cachedoverviewfmt + except AttributeError: + pass + try: + resp, lines = self._longcmdstring("LIST OVERVIEW.FMT") + except NNTPPermanentError: + # Not supported by server? + fmt = _DEFAULT_OVERVIEW_FMT[:] + else: + fmt = _parse_overview_fmt(lines) + self._cachedoverviewfmt = fmt + return fmt + + def _grouplist(self, lines): + # Parse lines into "group last first flag" + return [GroupInfo(*line.split()) for line in lines] + + def capabilities(self): + """Process a CAPABILITIES command. Not supported by all servers. Return: - resp: server response if successful - - list: list of newsgroup names""" + - caps: a dictionary mapping capability names to lists of tokens + (for example {'VERSION': ['2'], 'OVER': [], LIST: ['ACTIVE', 'HEADERS'] }) + """ + caps = {} + resp, lines = self._longcmdstring("CAPABILITIES") + for line in lines: + name, *tokens = line.split() + caps[name] = tokens + return resp, caps - return self.longcmd('NEWGROUPS ' + date + ' ' + time, file) + def newgroups(self, date, *, file=None): + """Process a NEWGROUPS command. Arguments: + - date: a date or datetime object + Return: + - resp: server response if successful + - list: list of newsgroup names + """ + if not isinstance(date, (datetime.date, datetime.date)): + raise TypeError( + "the date parameter must be a date or datetime object, " + "not '{:40}'".format(date.__class__.__name__)) + date_str, time_str = _unparse_datetime(date, self.nntp_version < 2) + cmd = 'NEWGROUPS {0} {1}'.format(date_str, time_str) + resp, lines = self._longcmdstring(cmd, file) + return resp, self._grouplist(lines) - def newnews(self, group, date, time, file=None): + def newnews(self, group, date, *, file=None): """Process a NEWNEWS command. Arguments: - group: group name or '*' - - date: string 'yymmdd' indicating the date - - time: string 'hhmmss' indicating the time + - date: a date or datetime object Return: - resp: server response if successful - - list: list of message ids""" + - list: list of message ids + """ + if not isinstance(date, (datetime.date, datetime.date)): + raise TypeError( + "the date parameter must be a date or datetime object, " + "not '{:40}'".format(date.__class__.__name__)) + date_str, time_str = _unparse_datetime(date, self.nntp_version < 2) + cmd = 'NEWNEWS {0} {1} {2}'.format(group, date_str, time_str) + return self._longcmdstring(cmd, file) - cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time - return self.longcmd(cmd, file) + def list(self, *, file=None): + """Process a LIST command. Argument: + - file: Filename string or file object to store the result in + Returns: + - resp: server response if successful + - list: list of (group, last, first, flag) (strings) + """ + resp, lines = self._longcmdstring('LIST', file) + return resp, self._grouplist(lines) - def list(self, file=None): - """Process a LIST command. Return: - - resp: server response if successful - - list: list of (group, last, first, flag) (strings)""" - - resp, list = self.longcmd('LIST', file) - for i in range(len(list)): - # Parse lines into "group last first flag" - list[i] = tuple(list[i].split()) - return resp, list + def _getdescriptions(self, group_pattern, return_all): + line_pat = re.compile('^(?P[^ \t]+)[ \t]+(.*)$') + # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first + resp, lines = self._longcmdstring('LIST NEWSGROUPS ' + group_pattern) + if not resp.startswith('215'): + # Now the deprecated XGTITLE. This either raises an error + # or succeeds with the same output structure as LIST + # NEWSGROUPS. + resp, lines = self._longcmdstring('XGTITLE ' + group_pattern) + groups = {} + for raw_line in lines: + match = line_pat.search(raw_line.strip()) + if match: + name, desc = match.group(1, 2) + if not return_all: + return desc + groups[name] = desc + if return_all: + return resp, groups + else: + # Nothing found + return '' def description(self, group): - """Get a description for a single group. If more than one group matches ('group' is a pattern), return the first. If no group matches, return an empty string. @@ -311,42 +609,24 @@ class NNTP: NOTE: This neither checks for a wildcard in 'group' nor does it check whether the group actually exists.""" - - resp, lines = self.descriptions(group) - if len(lines) == 0: - return b'' - else: - return lines[0][1] + return self._getdescriptions(group, False) def descriptions(self, group_pattern): """Get descriptions for a range of groups.""" - line_pat = re.compile(b'^(?P[^ \t]+)[ \t]+(.*)$') - # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first - resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern) - if not resp.startswith(b'215'): - # Now the deprecated XGTITLE. This either raises an error - # or succeeds with the same output structure as LIST - # NEWSGROUPS. - resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern) - lines = [] - for raw_line in raw_lines: - match = line_pat.search(raw_line.strip()) - if match: - lines.append(match.group(1, 2)) - return resp, lines + return self._getdescriptions(group_pattern, True) def group(self, name): """Process a GROUP command. Argument: - group: the group name Returns: - resp: server response if successful - - count: number of articles (string) - - first: first article number (string) - - last: last article number (string) - - name: the group name""" - - resp = self.shortcmd('GROUP ' + name) - if not resp.startswith(b'211'): + - count: number of articles + - first: first article number + - last: last article number + - name: the group name + """ + resp = self._shortcmd('GROUP ' + name) + if not resp.startswith('211'): raise NNTPReplyError(resp) words = resp.split() count = first = last = 0 @@ -359,151 +639,177 @@ class NNTP: last = words[3] if n > 4: name = words[4].lower() - return resp, count, first, last, name + return resp, int(count), int(first), int(last), name - def help(self, file=None): - """Process a HELP command. Returns: + def help(self, *, file=None): + """Process a HELP command. Argument: + - file: Filename string or file object to store the result in + Returns: - resp: server response if successful - - list: list of strings""" + - list: list of strings returned by the server in response to the + HELP command + """ + return self._longcmdstring('HELP', file) - return self.longcmd('HELP',file) - - def statparse(self, resp): - """Internal: parse the response of a STAT, NEXT or LAST command.""" - if not resp.startswith(b'22'): + def _statparse(self, resp): + """Internal: parse the response line of a STAT, NEXT, LAST, + ARTICLE, HEAD or BODY command.""" + if not resp.startswith('22'): raise NNTPReplyError(resp) words = resp.split() - nr = 0 - id = b'' - n = len(words) - if n > 1: - nr = words[1] - if n > 2: - id = words[2] - return resp, nr, id + art_num = int(words[1]) + message_id = words[2] + return resp, art_num, message_id - def statcmd(self, line): + def _statcmd(self, line): """Internal: process a STAT, NEXT or LAST command.""" - resp = self.shortcmd(line) - return self.statparse(resp) + resp = self._shortcmd(line) + return self._statparse(resp) - def stat(self, id): + def stat(self, message_spec=None): """Process a STAT command. Argument: - - id: article number or message id + - message_spec: article number or message id (if not specified, + the current article is selected) Returns: - resp: server response if successful - - nr: the article number - - id: the message id""" - - return self.statcmd('STAT {0}'.format(id)) + - art_num: the article number + - message_id: the message id + """ + if message_spec: + return self._statcmd('STAT {0}'.format(message_spec)) + else: + return self._statcmd('STAT') def next(self): """Process a NEXT command. No arguments. Return as for STAT.""" - return self.statcmd('NEXT') + return self._statcmd('NEXT') def last(self): """Process a LAST command. No arguments. Return as for STAT.""" - return self.statcmd('LAST') + return self._statcmd('LAST') - def artcmd(self, line, file=None): + def _artcmd(self, line, file=None): """Internal: process a HEAD, BODY or ARTICLE command.""" - resp, list = self.longcmd(line, file) - resp, nr, id = self.statparse(resp) - return resp, nr, id, list + resp, lines = self._longcmd(line, file) + resp, art_num, message_id = self._statparse(resp) + return resp, ArticleInfo(art_num, message_id, lines) - def head(self, id): + def head(self, message_spec=None, *, file=None): """Process a HEAD command. Argument: - - id: article number or message id + - message_spec: article number or message id + - file: filename string or file object to store the headers in Returns: - resp: server response if successful - - nr: article number - - id: message id - - list: the lines of the article's header""" + - ArticleInfo: (article number, message id, list of header lines) + """ + if message_spec is not None: + cmd = 'HEAD {0}'.format(message_spec) + else: + cmd = 'HEAD' + return self._artcmd(cmd, file) - return self.artcmd('HEAD {0}'.format(id)) - - def body(self, id, file=None): + def body(self, message_spec=None, *, file=None): """Process a BODY command. Argument: - - id: article number or message id - - file: Filename string or file object to store the article in + - message_spec: article number or message id + - file: filename string or file object to store the body in Returns: - resp: server response if successful - - nr: article number - - id: message id - - list: the lines of the article's body or an empty list - if file was used""" + - ArticleInfo: (article number, message id, list of body lines) + """ + if message_spec is not None: + cmd = 'BODY {0}'.format(message_spec) + else: + cmd = 'BODY' + return self._artcmd(cmd, file) - return self.artcmd('BODY {0}'.format(id), file) - - def article(self, id): + def article(self, message_spec=None, *, file=None): """Process an ARTICLE command. Argument: - - id: article number or message id + - message_spec: article number or message id + - file: filename string or file object to store the article in Returns: - resp: server response if successful - - nr: article number - - id: message id - - list: the lines of the article""" - - return self.artcmd('ARTICLE {0}'.format(id)) + - ArticleInfo: (article number, message id, list of article lines) + """ + if message_spec is not None: + cmd = 'ARTICLE {0}'.format(message_spec) + else: + cmd = 'ARTICLE' + return self._artcmd(cmd, file) def slave(self): """Process a SLAVE command. Returns: - - resp: server response if successful""" + - resp: server response if successful + """ + return self._shortcmd('SLAVE') - return self.shortcmd('SLAVE') - - def xhdr(self, hdr, str, file=None): + def xhdr(self, hdr, str, *, file=None): """Process an XHDR command (optional server extension). Arguments: - hdr: the header type (e.g. 'subject') - str: an article nr, a message id, or a range nr1-nr2 + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (nr, value) strings""" + - list: list of (nr, value) strings + """ + pat = re.compile('^([0-9]+) ?(.*)\n?') + resp, lines = self._longcmdstring('XHDR {0} {1}'.format(hdr, str), file) + def remove_number(line): + m = pat.match(line) + return m.group(1, 2) if m else line + return resp, [remove_number(line) for line in lines] - pat = re.compile(b'^([0-9]+) ?(.*)\n?') - resp, lines = self.longcmd('XHDR {0} {1}'.format(hdr, str), file) - for i in range(len(lines)): - line = lines[i] - m = pat.match(line) - if m: - lines[i] = m.group(1, 2) - return resp, lines - - def xover(self, start, end, file=None): + def xover(self, start, end, *, file=None): """Process an XOVER command (optional server extension) Arguments: - start: start of range - end: end of range + - file: Filename string or file object to store the result in Returns: - resp: server response if successful - - list: list of (art-nr, subject, poster, date, - id, references, size, lines)""" + - list: list of dicts containing the response fields + """ + resp, lines = self._longcmdstring('XOVER {0}-{1}'.format(start, end), + file) + fmt = self._getoverviewfmt() + return resp, _parse_overview(lines, fmt) - resp, lines = self.longcmd('XOVER {0}-{1}'.format(start, end), file) - xover_lines = [] - for line in lines: - elem = line.split(b'\t') - try: - xover_lines.append((elem[0], - elem[1], - elem[2], - elem[3], - elem[4], - elem[5].split(), - elem[6], - elem[7])) - except IndexError: - raise NNTPDataError(line) - return resp,xover_lines + def over(self, message_spec, *, file=None): + """Process an OVER command. If the command isn't supported, fall + back to XOVER. Arguments: + - message_spec: + - either a message id, indicating the article to fetch + information about + - or a (start, end) tuple, indicating a range of article numbers; + if end is None, information up to the newest message will be + retrieved + - or None, indicating the current article number must be used + - file: Filename string or file object to store the result in + Returns: + - resp: server response if successful + - list: list of dicts containing the response fields - def xgtitle(self, group, file=None): + NOTE: the "message id" form isn't supported by XOVER + """ + cmd = 'OVER' if 'OVER' in self._caps else 'XOVER' + if isinstance(message_spec, (tuple, list)): + start, end = message_spec + cmd += ' {0}-{1}'.format(start, end or '') + elif message_spec is not None: + cmd = cmd + ' ' + message_spec + resp, lines = self._longcmdstring(cmd, file) + fmt = self._getoverviewfmt() + return resp, _parse_overview(lines, fmt) + + def xgtitle(self, group, *, file=None): """Process an XGTITLE command (optional server extension) Arguments: - group: group name wildcard (i.e. news.*) Returns: - resp: server response if successful - list: list of (name,title) strings""" - - line_pat = re.compile(b'^([^ \t]+)[ \t]+(.*)$') - resp, raw_lines = self.longcmd('XGTITLE ' + group, file) + warnings.warn("The XGTITLE extension is not actively used, " + "use descriptions() instead", + PendingDeprecationWarning, 2) + line_pat = re.compile('^([^ \t]+)[ \t]+(.*)$') + resp, raw_lines = self._longcmdstring('XGTITLE ' + group, file) lines = [] for raw_line in raw_lines: match = line_pat.search(raw_line.strip()) @@ -511,15 +817,18 @@ class NNTP: lines.append(match.group(1, 2)) return resp, lines - def xpath(self,id): + def xpath(self, id): """Process an XPATH command (optional server extension) Arguments: - id: Message id of article Returns: resp: server response if successful - path: directory path to article""" + path: directory path to article + """ + warnings.warn("The XPATH extension is not actively used", + PendingDeprecationWarning, 2) - resp = self.shortcmd('XPATH {0}'.format(id)) - if not resp.startswith(b'223'): + resp = self._shortcmd('XPATH {0}'.format(id)) + if not resp.startswith('223'): raise NNTPReplyError(resp) try: [resp_num, path] = resp.split() @@ -528,89 +837,144 @@ class NNTP: else: return resp, path - def date (self): - """Process the DATE command. Arguments: - None + def date(self): + """Process the DATE command. Returns: - resp: server response if successful - date: Date suitable for newnews/newgroups commands etc. - time: Time suitable for newnews/newgroups commands etc.""" - - resp = self.shortcmd("DATE") - if not resp.startswith(b'111'): + - resp: server response if successful + - date: datetime object + """ + resp = self._shortcmd("DATE") + if not resp.startswith('111'): raise NNTPReplyError(resp) elem = resp.split() if len(elem) != 2: raise NNTPDataError(resp) - date = elem[1][2:8] - time = elem[1][-6:] - if len(date) != 6 or len(time) != 6: + date = elem[1] + if len(date) != 14: raise NNTPDataError(resp) - return resp, date, time + return resp, _parse_datetime(date, None) def _post(self, command, f): - resp = self.shortcmd(command) - # Raises error_??? if posting is not allowed - if not resp.startswith(b'3'): + resp = self._shortcmd(command) + # Raises a specific exception if posting is not allowed + if not resp.startswith('3'): raise NNTPReplyError(resp) - while 1: - line = f.readline() - if not line: - break - if line.endswith(b'\n'): - line = line[:-1] + if isinstance(f, (bytes, bytearray)): + f = f.splitlines() + # We don't use _putline() because: + # - we don't want additional CRLF if the file or iterable is already + # in the right format + # - we don't want a spurious flush() after each line is written + for line in f: + if not line.endswith(_CRLF): + line = line.rstrip(b"\r\n") + _CRLF if line.startswith(b'.'): line = b'.' + line - self.putline(line) - self.putline(b'.') - return self.getresp() + self.file.write(line) + self.file.write(b".\r\n") + self.file.flush() + return self._getresp() - def post(self, f): + def post(self, data): """Process a POST command. Arguments: - - f: file containing the article + - data: bytes object, iterable or file containing the article Returns: - resp: server response if successful""" - return self._post('POST', f) + return self._post('POST', data) - def ihave(self, id, f): + def ihave(self, message_id, data): """Process an IHAVE command. Arguments: - - id: message-id of the article - - f: file containing the article + - message_id: message-id of the article + - data: file containing the article Returns: - resp: server response if successful Note that if the server refuses the article an exception is raised.""" - return self._post('IHAVE {0}'.format(id), f) + return self._post('IHAVE {0}'.format(message_id), data) + + def _close(self): + self.file.close() + del self.file def quit(self): """Process a QUIT command and close the socket. Returns: - resp: server response if successful""" - - resp = self.shortcmd('QUIT') - self.file.close() - self.sock.close() - del self.file, self.sock + try: + resp = self._shortcmd('QUIT') + finally: + self._close() return resp +class NNTP(_NNTPBase): + + def __init__(self, host, port=NNTP_PORT, user=None, password=None, + readermode=None, usenetrc=True, + timeout=_GLOBAL_DEFAULT_TIMEOUT): + """Initialize an instance. Arguments: + - host: hostname to connect to + - port: port to connect to (default the standard NNTP port) + - user: username to authenticate with + - password: password to use with username + - readermode: if true, send 'mode reader' command after + connecting. + - usenetrc: allow loading username and password from ~/.netrc file + if not specified explicitly + - timeout: timeout (in seconds) used for socket connections + + readermode is sometimes necessary if you are connecting to an + NNTP server on the local machine and intend to call + reader-specific comamnds, such as `group'. If you get + unexpected NNTPPermanentErrors, you might need to set + readermode. + """ + self.host = host + self.port = port + self.sock = socket.create_connection((host, port), timeout) + file = self.sock.makefile("rwb") + _NNTPBase.__init__(self, file, user, password, + readermode, usenetrc, timeout) + + def _close(self): + try: + _NNTPBase._close(self) + finally: + self.sock.close() + + # Test retrieval when run as a script. -# Assumption: if there's a local news server, it's called 'news'. -# Assumption: if user queries a remote news server, it's named -# in the environment variable NNTPSERVER (used by slrn and kin) -# and we want readermode off. if __name__ == '__main__': - import os - newshost = 'news' and os.environ["NNTPSERVER"] - if newshost.find('.') == -1: - mode = 'readermode' - else: - mode = None - s = NNTP(newshost, readermode=mode) - resp, count, first, last, name = s.group('comp.lang.python') - print(resp) + import argparse + from email.utils import parsedate + + parser = argparse.ArgumentParser(description="""\ + nntplib built-in demo - display the latest articles in a newsgroup""") + parser.add_argument('-g', '--group', default='gmane.comp.python.general', + help='group to fetch messages from (default: %(default)s)') + parser.add_argument('-s', '--server', default='news.gmane.org', + help='NNTP server hostname (default: %(default)s)') + parser.add_argument('-p', '--port', default=NNTP_PORT, type=int, + help='NNTP port number (default: %(default)s)') + parser.add_argument('-n', '--nb-articles', default=10, type=int, + help='number of articles to fetch (default: %(default)s)') + args = parser.parse_args() + + s = NNTP(host=args.server, port=args.port) + resp, count, first, last, name = s.group(args.group) print('Group', name, 'has', count, 'articles, range', first, 'to', last) - resp, subs = s.xhdr('subject', '{0}-{1}'.format(first, last)) - print(resp) - for item in subs: - print("%7s %s" % item) - resp = s.quit() - print(resp) + + def cut(s, lim): + if len(s) > lim: + s = s[:lim - 4] + "..." + return s + + first = str(int(last) - args.nb_articles + 1) + resp, overviews = s.xover(first, last) + for artnum, over in overviews: + author = decode_header(over['from']).split('<', 1)[0] + subject = decode_header(over['subject']) + lines = int(over[':lines']) + print("{:7} {:20} {:42} ({})".format( + artnum, cut(author, 20), cut(subject, 42), lines) + ) + + s.quit() diff -r 5cbe28291492 Lib/test/test_nntplib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/test_nntplib.py Fri Sep 24 22:41:37 2010 +0200 @@ -0,0 +1,1091 @@ +import io +import datetime +import textwrap +import unittest +import contextlib +from test import support +from nntplib import NNTP, GroupInfo +import nntplib + +TIMEOUT = 30 + +# TODO: +# - test the `file` arg to more commands +# - test error conditions + + +class NetworkedNNTPTestsMixin: + + def test_welcome(self): + welcome = self.server.getwelcome() + self.assertEqual(str, type(welcome)) + + def test_help(self): + resp, list = self.server.help() + self.assertTrue(resp.startswith("100 "), resp) + for line in list: + self.assertEqual(str, type(line)) + + def test_list(self): + resp, list = self.server.list() + if len(list) > 0: + self.assertEqual(GroupInfo, type(list[0])) + self.assertEqual(str, type(list[0].group)) + + def test_unknown_command(self): + with self.assertRaises(nntplib.NNTPPermanentError) as cm: + self.server._shortcmd("XYZZY") + resp = cm.exception.response + self.assertTrue(resp.startswith("500 "), resp) + + def test_newgroups(self): + # gmane gets a constant influx of new groups. In order not to stress + # the server too much, we choose a recent date in the past. + dt = datetime.date.today() - datetime.timedelta(days=7) + resp, groups = self.server.newgroups(dt) + if len(groups) > 0: + self.assertIsInstance(groups[0], GroupInfo) + self.assertIsInstance(groups[0].group, str) + + def test_description(self): + def _check_desc(desc): + # Sanity checks + self.assertIsInstance(desc, str) + self.assertNotIn(self.GROUP_NAME, desc) + desc = self.server.description(self.GROUP_NAME) + _check_desc(desc) + # Another sanity check + self.assertIn("Python", desc) + # With a pattern + desc = self.server.description(self.GROUP_PAT) + _check_desc(desc) + # Shouldn't exist + desc = self.server.description("zk.brrtt.baz") + self.assertEqual(desc, '') + + def test_descriptions(self): + resp, descs = self.server.descriptions(self.GROUP_PAT) + # 215 for LIST NEWSGROUPS, 282 for XGTITLE + self.assertTrue( + resp.startswith("215 ") or resp.startswith("282 "), resp) + self.assertIsInstance(descs, dict) + desc = descs[self.GROUP_NAME] + self.assertEqual(desc, self.server.description(self.GROUP_NAME)) + + def test_group(self): + result = self.server.group(self.GROUP_NAME) + self.assertEqual(5, len(result)) + resp, count, first, last, group = result + self.assertEqual(group, self.GROUP_NAME) + self.assertIsInstance(count, int) + self.assertIsInstance(first, int) + self.assertIsInstance(last, int) + self.assertLessEqual(first, last) + self.assertTrue(resp.startswith("211 "), resp) + + def test_date(self): + resp, date = self.server.date() + self.assertIsInstance(date, datetime.datetime) + # Sanity check + self.assertGreaterEqual(date.year, 1995) + self.assertLessEqual(date.year, 2030) + + def _check_art_dict(self, art_dict): + # Some sanity checks for a field dictionary returned by OVER / XOVER + self.assertIsInstance(art_dict, dict) + # NNTP has 7 mandatory fields + self.assertGreaterEqual(art_dict.keys(), + {"subject", "from", "date", "message-id", + "references", ":bytes", ":lines"} + ) + for v in art_dict.values(): + self.assertIsInstance(v, str) + + def test_xover(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xover(last, last) + art_num, art_dict = lines[0] + self.assertEqual(art_num, last) + self._check_art_dict(art_dict) + + def test_over(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + start = last - 10 + # The "start-" article range form + resp, lines = self.server.over((start, None)) + art_num, art_dict = lines[0] + self._check_art_dict(art_dict) + # The "start-end" article range form + resp, lines = self.server.over((start, last)) + art_num, art_dict = lines[-1] + self.assertEqual(art_num, last) + self._check_art_dict(art_dict) + # XXX The "message_id" form is unsupported by gmane + # 503 Overview by message-ID unsupported + + def test_xhdr(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xhdr('subject', last) + for line in lines: + self.assertEqual(str, type(line[1])) + + def check_article_resp(self, resp, article, art_num=None): + self.assertIsInstance(article, nntplib.ArticleInfo) + if art_num is not None: + self.assertEqual(article.number, art_num) + for line in article.lines: + self.assertIsInstance(line, bytes) + # XXX this could exceptionally happen... + self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) + + def test_article_head_body(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, head = self.server.head(last) + self.assertTrue(resp.startswith("221 "), resp) + self.check_article_resp(resp, head, last) + resp, body = self.server.body(last) + self.assertTrue(resp.startswith("222 "), resp) + self.check_article_resp(resp, body, last) + resp, article = self.server.article(last) + self.assertTrue(resp.startswith("220 "), resp) + self.check_article_resp(resp, article, last) + self.assertEqual(article.lines, head.lines + [b''] + body.lines) + + def test_quit(self): + self.server.quit() + self.server = None + + +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): + NNTP_HOST = 'news.gmane.org' + GROUP_NAME = 'gmane.comp.python.devel' + GROUP_PAT = 'gmane.comp.python.d*' + + def setUp(self): + support.requires("network") + with support.transient_internet(self.NNTP_HOST): + self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT) + + def tearDown(self): + if self.server is not None: + self.server.quit() + + # Disabled with gmane as it produces too much data + test_list = None + + def test_capabilities(self): + # As of this writing, gmane implements NNTP version 2 and has a + # couple of well-known capabilities. Just sanity check that we + # got them. + def _check_caps(caps): + caps_list = caps['LIST'] + self.assertIsInstance(caps_list, (list, tuple)) + self.assertIn('OVERVIEW.FMT', caps_list) + self.assertGreaterEqual(self.server.nntp_version, 2) + _check_caps(self.server.getcapabilities()) + # This re-emits the command + resp, caps = self.server.capabilities() + _check_caps(caps) + + +# +# Non-networked tests using a local server (or something mocking it). +# + +class _NNTPServerIO(io.RawIOBase): + """A raw IO object allowing NNTP commands to be received and processed + by a handler. The handler can push responses which can then be read + from the IO object.""" + + def __init__(self, handler): + io.RawIOBase.__init__(self) + # The channel from the client + self.c2s = io.BytesIO() + # The channel to the client + self.s2c = io.BytesIO() + self.handler = handler + self.handler.start(self.c2s.readline, self.push_data) + + def readable(self): + return True + + def writable(self): + return True + + def push_data(self, data): + """Push (buffer) some data to send to the client.""" + pos = self.s2c.tell() + self.s2c.seek(0, 2) + self.s2c.write(data) + self.s2c.seek(pos) + + def write(self, b): + """The client sends us some data""" + pos = self.c2s.tell() + self.c2s.write(b) + self.c2s.seek(pos) + self.handler.process_pending() + return len(b) + + def readinto(self, buf): + """The client wants to read a response""" + self.handler.process_pending() + b = self.s2c.read(len(buf)) + n = len(b) + buf[:n] = b + return n + + +class MockedNNTPTestsMixin: + # Override in derived classes + handler_class = None + + def setUp(self): + super().setUp() + self.make_server() + + def tearDown(self): + super().tearDown() + del self.server + + def make_server(self, *args, **kwargs): + self.handler = self.handler_class() + self.sio = _NNTPServerIO(self.handler) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(self.sio, self.sio) + self.server = nntplib._NNTPBase(file, *args, **kwargs) + return self.server + + +class NNTPv1Handler: + """A handler for RFC 977""" + + welcome = "200 NNTP mock server" + + def start(self, readline, push_data): + self.in_body = False + self.allow_posting = True + self._readline = readline + self._push_data = push_data + # Our welcome + self.handle_welcome() + + def _decode(self, data): + return str(data, "utf-8", "surrogateescape") + + def process_pending(self): + if self.in_body: + while True: + line = self._readline() + if not line: + return + self.body.append(line) + if line == b".\r\n": + break + try: + meth, tokens = self.body_callback + meth(*tokens, body=self.body) + finally: + self.body_callback = None + self.body = None + self.in_body = False + while True: + line = self._decode(self._readline()) + if not line: + return + if not line.endswith("\r\n"): + raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) + line = line[:-2] + cmd, *tokens = line.split() + #meth = getattr(self.handler, "handle_" + cmd.upper(), None) + meth = getattr(self, "handle_" + cmd.upper(), None) + if meth is None: + self.handle_unknown() + else: + try: + meth(*tokens) + except Exception as e: + raise ValueError("command failed: {!r}".format(line)) from e + else: + if self.in_body: + self.body_callback = meth, tokens + self.body = [] + + def expect_body(self): + """Flag that the client is expected to post a request body""" + self.in_body = True + + def push_data(self, data): + """Push some binary data""" + self._push_data(data) + + def push_lit(self, lit): + """Push a string literal""" + lit = textwrap.dedent(lit) + lit = "\r\n".join(lit.splitlines()) + "\r\n" + lit = lit.encode('utf-8') + self.push_data(lit) + + def handle_unknown(self): + self.push_lit("500 What?") + + def handle_welcome(self): + self.push_lit(self.welcome) + + def handle_QUIT(self): + self.push_lit("205 Bye!") + + def handle_DATE(self): + self.push_lit("111 20100914001155") + + def handle_GROUP(self, group): + if group == "fr.comp.lang.python": + self.push_lit("211 486 761 1265 fr.comp.lang.python") + else: + self.push_lit("411 No such group {}".format(group)) + + def handle_HELP(self): + self.push_lit("""\ + 100 Legal commands + authinfo user Name|pass Password|generic + date + help + Report problems to + .""") + + def handle_STAT(self, message_spec=None): + if message_spec is None: + self.push_lit("412 No newsgroup selected") + elif message_spec == "3000234": + self.push_lit("223 3000234 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("223 0 <45223423@example.com>") + else: + self.push_lit("430 No Such Article Found") + + def handle_NEXT(self): + self.push_lit("223 3000237 <668929@example.org> retrieved") + + def handle_LAST(self): + self.push_lit("223 3000234 <45223423@example.com> retrieved") + + def handle_LIST(self, action=None, param=None): + if action is None: + self.push_lit("""\ + 215 Newsgroups in form "group high low flags". + comp.lang.python 0000052340 0000002828 y + comp.lang.python.announce 0000001153 0000000993 m + free.it.comp.lang.python 0000000002 0000000002 y + fr.comp.lang.python 0000001254 0000000760 y + free.it.comp.lang.python.learner 0000000000 0000000001 y + tw.bbs.comp.lang.python 0000000304 0000000304 y + .""") + elif action == "OVERVIEW.FMT": + self.push_lit("""\ + 215 Order of fields in overview database. + Subject: + From: + Date: + Message-ID: + References: + Bytes: + Lines: + Xref:full + .""") + elif action == "NEWSGROUPS": + assert param is not None + if param == "comp.lang.python": + self.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + .""") + elif param == "comp.lang.python*": + self.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) + comp.lang.python\tThe Python computer language. + .""") + else: + self.push_lit("""\ + 215 Descriptions in form "group description". + .""") + else: + self.push_lit('501 Unknown LIST keyword') + + def handle_NEWNEWS(self, group, date_str, time_str): + # We hard code different return messages depending on passed + # argument and date syntax. + if (group == "comp.lang.python" and date_str == "20100913" + and time_str == "082004"): + # Date was passed in RFC 3977 format (NNTP "v2") + self.push_lit("""\ + 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + elif (group == "comp.lang.python" and date_str == "100913" + and time_str == "082004"): + # Date was passed in RFC 977 format (NNTP "v1") + self.push_lit("""\ + 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + else: + self.push_lit("""\ + 230 An empty list of newsarticles follows + .""") + # (Note for experiments: many servers disable NEWNEWS. + # As of this writing, sicinfo3.epfl.ch doesn't.) + + def handle_XOVER(self, message_spec): + if message_spec == "57-59": + self.push_lit( + "224 Overview information for 57-58 follows\n" + "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" + "\tDoug Hellmann " + "\tSat, 19 Jun 2010 18:04:08 -0400" + "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" + "\t\t7103\t16" + "\tXref: news.gmane.org gmane.comp.python.authors:57" + "\n" + "58\tLooking for a few good bloggers" + "\tDoug Hellmann " + "\tThu, 22 Jul 2010 09:14:14 -0400" + "\t" + "\t\t6683\t16" + "\tXref: news.gmane.org gmane.comp.python.authors:58" + "\n" + # An UTF-8 overview line from fr.comp.lang.python + "59\tRe: Message d'erreur incompréhensible (par moi)" + "\tEric Brunel " + "\tWed, 15 Sep 2010 18:09:15 +0200" + "\t" + "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" + "\tXref: saria.nerim.net fr.comp.lang.python:1265" + "\n" + ".\n") + else: + self.push_lit("""\ + 224 No articles + .""") + + def handle_POST(self, *, body=None): + if body is None: + if self.allow_posting: + self.push_lit("340 Input article; end with .") + self.expect_body() + else: + self.push_lit("440 Posting not permitted") + else: + assert self.allow_posting + self.push_lit("240 Article received OK") + self.posted_body = body + + def handle_IHAVE(self, message_id, *, body=None): + if body is None: + if (self.allow_posting and + message_id == ""): + self.push_lit("335 Send it; end with .") + self.expect_body() + else: + self.push_lit("435 Article not wanted") + else: + assert self.allow_posting + self.push_lit("235 Article transferred OK") + self.posted_body = body + + sample_head = """\ + From: "Demo User" + Subject: I am just a test article + Content-Type: text/plain; charset=UTF-8; format=flowed + Message-ID: """ + + sample_body = """\ + This is just a test article. + ..Here is a dot-starting line. + + -- Signed by Andr\xe9.""" + + sample_article = sample_head + "\n\n" + sample_body + + def handle_ARTICLE(self, message_spec=None): + if message_spec is None: + self.push_lit("220 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("220 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("220 3000234 <45223423@example.com>") + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_article) + self.push_lit(".") + + def handle_HEAD(self, message_spec=None): + if message_spec is None: + self.push_lit("221 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("221 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("221 3000234 <45223423@example.com>") + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_head) + self.push_lit(".") + + def handle_BODY(self, message_spec=None): + if message_spec is None: + self.push_lit("222 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("222 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("222 3000234 <45223423@example.com>") + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_body) + self.push_lit(".") + + +class NNTPv2Handler(NNTPv1Handler): + """A handler for RFC 3977 (NNTP "v2")""" + + def handle_CAPABILITIES(self): + self.push_lit("""\ + 101 Capability list: + VERSION 2 + IMPLEMENTATION INN 2.5.1 + AUTHINFO USER + HDR + LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT + OVER + POST + READER + .""") + + def handle_OVER(self, message_spec=None): + return self.handle_XOVER(message_spec) + + +class NNTPv1v2TestsMixin: + + def setUp(self): + super().setUp() + + def test_welcome(self): + self.assertEqual(self.server.welcome, self.handler.welcome) + + def test_date(self): + resp, date = self.server.date() + self.assertEqual(resp, "111 20100914001155") + self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) + + def test_quit(self): + self.assertFalse(self.sio.closed) + resp = self.server.quit() + self.assertEqual(resp, "205 Bye!") + self.assertTrue(self.sio.closed) + + def test_help(self): + resp, help = self.server.help() + self.assertEqual(resp, "100 Legal commands") + self.assertEqual(help, [ + ' authinfo user Name|pass Password|generic ', + ' date', + ' help', + 'Report problems to ', + ]) + + def test_list(self): + resp, groups = self.server.list() + self.assertEqual(len(groups), 6) + g = groups[1] + self.assertEqual(g, + GroupInfo("comp.lang.python.announce", "0000001153", + "0000000993", "m")) + + def test_stat(self): + resp, art_num, message_id = self.server.stat(3000234) + self.assertEqual(resp, "223 3000234 <45223423@example.com>") + self.assertEqual(art_num, 3000234) + self.assertEqual(message_id, "<45223423@example.com>") + resp, art_num, message_id = self.server.stat("<45223423@example.com>") + self.assertEqual(resp, "223 0 <45223423@example.com>") + self.assertEqual(art_num, 0) + self.assertEqual(message_id, "<45223423@example.com>") + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.stat("") + self.assertEqual(cm.exception.response, "430 No Such Article Found") + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.stat() + self.assertEqual(cm.exception.response, "412 No newsgroup selected") + + def test_next(self): + resp, art_num, message_id = self.server.next() + self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") + self.assertEqual(art_num, 3000237) + self.assertEqual(message_id, "<668929@example.org>") + + def test_last(self): + resp, art_num, message_id = self.server.last() + self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") + self.assertEqual(art_num, 3000234) + self.assertEqual(message_id, "<45223423@example.com>") + + def test_description(self): + desc = self.server.description("comp.lang.python") + self.assertEqual(desc, "The Python computer language.") + desc = self.server.description("comp.lang.pythonx") + self.assertEqual(desc, "") + + def test_descriptions(self): + resp, groups = self.server.descriptions("comp.lang.python") + self.assertEqual(resp, '215 Descriptions in form "group description".') + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + }) + resp, groups = self.server.descriptions("comp.lang.python*") + self.assertEqual(groups, { + "comp.lang.python": "The Python computer language.", + "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", + }) + resp, groups = self.server.descriptions("comp.lang.pythonx") + self.assertEqual(groups, {}) + + def test_group(self): + resp, count, first, last, group = self.server.group("fr.comp.lang.python") + self.assertTrue(resp.startswith("211 "), resp) + self.assertEqual(first, 761) + self.assertEqual(last, 1265) + self.assertEqual(count, 486) + self.assertEqual(group, "fr.comp.lang.python") + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.group("comp.lang.python.devel") + exc = cm.exception + self.assertTrue(exc.response.startswith("411 No such group"), + exc.response) + + def test_newnews(self): + # NEWNEWS comp.lang.python [20]100913 082004 + dt = datetime.datetime(2010, 9, 13, 8, 20, 4) + resp, ids = self.server.newnews("comp.lang.python", dt) + expected = ( + "230 list of newsarticles (NNTP v{0}) " + "created after Mon Sep 13 08:20:04 2010 follows" + ).format(self.nntp_version) + self.assertEqual(resp, expected) + self.assertEqual(ids, [ + "", + "", + ]) + # NEWNEWS fr.comp.lang.python [20]100913 082004 + dt = datetime.datetime(2010, 9, 13, 8, 20, 4) + resp, ids = self.server.newnews("fr.comp.lang.python", dt) + self.assertEqual(resp, "230 An empty list of newsarticles follows") + self.assertEqual(ids, []) + + def _check_article_body(self, lines): + self.assertEqual(len(lines), 4) + self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.") + self.assertEqual(lines[-2], b"") + self.assertEqual(lines[-3], b".Here is a dot-starting line.") + self.assertEqual(lines[-4], b"This is just a test article.") + + def _check_article_head(self, lines): + self.assertEqual(len(lines), 4) + self.assertEqual(lines[0], b'From: "Demo User" ') + self.assertEqual(lines[3], b"Message-ID: ") + + def _check_article_data(self, lines): + self.assertEqual(len(lines), 9) + self._check_article_head(lines[:4]) + self._check_article_body(lines[-4:]) + self.assertEqual(lines[4], b"") + + def test_article(self): + # ARTICLE + resp, info = self.server.article() + self.assertEqual(resp, "220 3000237 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000237) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_data(lines) + # ARTICLE num + resp, info = self.server.article(3000234) + self.assertEqual(resp, "220 3000234 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000234) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_data(lines) + # ARTICLE id + resp, info = self.server.article("<45223423@example.com>") + self.assertEqual(resp, "220 0 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 0) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_data(lines) + # Non-existent id + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.article("") + self.assertEqual(cm.exception.response, "430 No Such Article Found") + + def test_article_file(self): + # With a "file" argument + f = io.BytesIO() + resp, info = self.server.article(file=f) + self.assertEqual(resp, "220 3000237 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000237) + self.assertEqual(message_id, "<45223423@example.com>") + self.assertEqual(lines, []) + data = f.getvalue() + self.assertTrue(data.startswith( + b'From: "Demo User" \r\n' + b'Subject: I am just a test article\r\n' + ), ascii(data)) + self.assertTrue(data.endswith( + b'This is just a test article.\r\n' + b'.Here is a dot-starting line.\r\n' + b'\r\n' + b'-- Signed by Andr\xc3\xa9.\r\n' + ), ascii(data)) + + def test_head(self): + # HEAD + resp, info = self.server.head() + self.assertEqual(resp, "221 3000237 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000237) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_head(lines) + # HEAD num + resp, info = self.server.head(3000234) + self.assertEqual(resp, "221 3000234 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000234) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_head(lines) + # HEAD id + resp, info = self.server.head("<45223423@example.com>") + self.assertEqual(resp, "221 0 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 0) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_head(lines) + # Non-existent id + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.head("") + self.assertEqual(cm.exception.response, "430 No Such Article Found") + + def test_body(self): + # BODY + resp, info = self.server.body() + self.assertEqual(resp, "222 3000237 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000237) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_body(lines) + # BODY num + resp, info = self.server.body(3000234) + self.assertEqual(resp, "222 3000234 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 3000234) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_body(lines) + # BODY id + resp, info = self.server.body("<45223423@example.com>") + self.assertEqual(resp, "222 0 <45223423@example.com>") + art_num, message_id, lines = info + self.assertEqual(art_num, 0) + self.assertEqual(message_id, "<45223423@example.com>") + self._check_article_body(lines) + # Non-existent id + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.body("") + self.assertEqual(cm.exception.response, "430 No Such Article Found") + + def check_over_xover_resp(self, resp, overviews): + self.assertTrue(resp.startswith("224 "), resp) + self.assertEqual(len(overviews), 3) + art_num, over = overviews[0] + self.assertEqual(art_num, 57) + self.assertEqual(over, { + "from": "Doug Hellmann ", + "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", + "date": "Sat, 19 Jun 2010 18:04:08 -0400", + "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", + "references": "", + ":bytes": "7103", + ":lines": "16", + "xref": "news.gmane.org gmane.comp.python.authors:57" + }) + art_num, over = overviews[2] + self.assertEqual(over["subject"], + "Re: Message d'erreur incompréhensible (par moi)") + + def test_xover(self): + resp, overviews = self.server.xover(57, 59) + self.check_over_xover_resp(resp, overviews) + + def test_over(self): + # In NNTP "v1", this will fallback on XOVER + resp, overviews = self.server.over((57, 59)) + self.check_over_xover_resp(resp, overviews) + + sample_post = ( + b'From: "Demo User" \r\n' + b'Subject: I am just a test article\r\n' + b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' + b'Message-ID: \r\n' + b'\r\n' + b'This is just a test article.\r\n' + b'.Here is a dot-starting line.\r\n' + b'\r\n' + b'-- Signed by Andr\xc3\xa9.\r\n' + ) + + def _check_posted_body(self): + # Check the raw body as received by the server + lines = self.handler.posted_body + # One additional line for the "." terminator + self.assertEqual(len(lines), 10) + self.assertEqual(lines[-1], b'.\r\n') + self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') + self.assertEqual(lines[-3], b'\r\n') + self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') + self.assertEqual(lines[0], b'From: "Demo User" \r\n') + + def _check_post_ihave_sub(self, func, *args, file_factory): + # First the prepared post with CRLF endings + post = self.sample_post + func_args = args + (file_factory(post),) + self.handler.posted_body = None + resp = func(*func_args) + self._check_posted_body() + # Then the same post with "normal" line endings - they should be + # converted by NNTP.post and NNTP.ihave. + post = self.sample_post.replace(b"\r\n", b"\n") + func_args = args + (file_factory(post),) + self.handler.posted_body = None + resp = func(*func_args) + self._check_posted_body() + return resp + + def check_post_ihave(self, func, success_resp, *args): + # With a bytes object + resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) + self.assertEqual(resp, success_resp) + # With a bytearray object + resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) + self.assertEqual(resp, success_resp) + # With a file object + resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) + self.assertEqual(resp, success_resp) + # With an iterable of terminated lines + def iterlines(b): + return iter(b.splitlines(True)) + resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) + self.assertEqual(resp, success_resp) + # With an iterable of non-terminated lines + def iterlines(b): + return iter(b.splitlines(False)) + resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) + self.assertEqual(resp, success_resp) + + def test_post(self): + self.check_post_ihave(self.server.post, "240 Article received OK") + self.handler.allow_posting = False + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.post(self.sample_post) + self.assertEqual(cm.exception.response, + "440 Posting not permitted") + + def test_ihave(self): + self.check_post_ihave(self.server.ihave, "235 Article transferred OK", + "") + with self.assertRaises(nntplib.NNTPTemporaryError) as cm: + self.server.ihave("", self.sample_post) + self.assertEqual(cm.exception.response, + "435 Article not wanted") + + +class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v1 server (no capabilities).""" + + nntp_version = 1 + handler_class = NNTPv1Handler + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, {}) + self.assertEqual(self.server.nntp_version, 1) + + +class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): + """Tests an NNTP v2 server (with capabilities).""" + + nntp_version = 2 + handler_class = NNTPv2Handler + + def test_caps(self): + caps = self.server.getcapabilities() + self.assertEqual(caps, { + 'VERSION': ['2'], + 'IMPLEMENTATION': ['INN', '2.5.1'], + 'AUTHINFO': ['USER'], + 'HDR': [], + 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', + 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], + 'OVER': [], + 'POST': [], + 'READER': [], + }) + self.assertEqual(self.server.nntp_version, 2) + + +class MiscTests(unittest.TestCase): + + def test_decode_header(self): + def gives(a, b): + self.assertEqual(nntplib.decode_header(a), b) + gives("" , "") + gives("a plain header", "a plain header") + gives(" with extra spaces ", " with extra spaces ") + gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") + gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" + " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", + "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") + gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", + "Re: problème de matrice") + # A natively utf-8 header (found in the real world!) + gives("Re: Message d'erreur incompréhensible (par moi)", + "Re: Message d'erreur incompréhensible (par moi)") + + def test_parse_overview_fmt(self): + # The minimal (default) response + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # The minimal response using alternative names + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # Variations in casing + lines = ["subject:", "FROM:", "DaTe:", "message-ID:", + "References:", "BYTES:", "Lines:"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines"]) + # First example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", ":bytes", ":lines", "Xref:full", + "Distribution:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # Second example from RFC 3977 + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:FULL", + "Distribution:FULL"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref", "distribution"]) + # A classic response from INN + lines = ["Subject:", "From:", "Date:", "Message-ID:", + "References:", "Bytes:", "Lines:", "Xref:full"] + self.assertEqual(nntplib._parse_overview_fmt(lines), + ["subject", "from", "date", "message-id", "references", + ":bytes", ":lines", "xref"]) + + def test_parse_overview(self): + fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] + # First example from RFC 3977 + lines = [ + '3000234\tI am just a test article\t"Demo User" ' + '\t6 Oct 1998 04:38:40 -0500\t' + '<45223423@example.com>\t<45454@example.net>\t1234\t' + '17\tXref: news.example.com misc.test:3000363', + ] + overview = nntplib._parse_overview(lines, fmt) + (art_num, fields), = overview + self.assertEqual(art_num, 3000234) + self.assertEqual(fields, { + 'subject': 'I am just a test article', + 'from': '"Demo User" ', + 'date': '6 Oct 1998 04:38:40 -0500', + 'message-id': '<45223423@example.com>', + 'references': '<45454@example.net>', + ':bytes': '1234', + ':lines': '17', + 'xref': 'news.example.com misc.test:3000363', + }) + + def test_parse_datetime(self): + def gives(a, b, *c): + self.assertEqual(nntplib._parse_datetime(a, b), + datetime.datetime(*c)) + # Output of DATE command + gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) + # Variations + gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("990623", "135624", 1999, 6, 23, 13, 56, 24) + gives("090623", "135624", 2009, 6, 23, 13, 56, 24) + + def test_unparse_datetime(self): + # Test non-legacy mode + # 1) with a datetime + def gives(y, M, d, h, m, s, date_str, time_str): + dt = datetime.datetime(y, M, d, h, m, s) + self.assertEqual(nntplib._unparse_datetime(dt), + (date_str, time_str)) + self.assertEqual(nntplib._unparse_datetime(dt, False), + (date_str, time_str)) + gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") + gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") + gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") + # 2) with a date + def gives(y, M, d, date_str, time_str): + dt = datetime.date(y, M, d) + self.assertEqual(nntplib._unparse_datetime(dt), + (date_str, time_str)) + self.assertEqual(nntplib._unparse_datetime(dt, False), + (date_str, time_str)) + gives(1999, 6, 23, "19990623", "000000") + gives(2000, 6, 23, "20000623", "000000") + gives(2010, 6, 5, "20100605", "000000") + + def test_unparse_datetime_legacy(self): + # Test legacy mode (RFC 977) + # 1) with a datetime + def gives(y, M, d, h, m, s, date_str, time_str): + dt = datetime.datetime(y, M, d, h, m, s) + self.assertEqual(nntplib._unparse_datetime(dt, True), + (date_str, time_str)) + gives(1999, 6, 23, 13, 56, 24, "990623", "135624") + gives(2000, 6, 23, 13, 56, 24, "000623", "135624") + gives(2010, 6, 5, 1, 2, 3, "100605", "010203") + # 2) with a date + def gives(y, M, d, date_str, time_str): + dt = datetime.date(y, M, d) + self.assertEqual(nntplib._unparse_datetime(dt, True), + (date_str, time_str)) + gives(1999, 6, 23, "990623", "000000") + gives(2000, 6, 23, "000623", "000000") + gives(2010, 6, 5, "100605", "000000") + + +def test_main(): + support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests, + NetworkedNNTPTests + ) + + +if __name__ == "__main__": + test_main()