diff -r af46be655a2e Lib/smtpd.py --- a/Lib/smtpd.py Sun Sep 27 12:37:20 2015 -0400 +++ b/Lib/smtpd.py Sun Sep 27 20:16:32 2015 -0400 @@ -79,16 +79,16 @@ import sys import os -import errno import getopt import time import socket -import asyncore -import asynchat +import signal +import asyncio import collections from warnings import warn from email._header_value_parser import get_addr_spec, get_angle_addr + __all__ = ["SMTPServer","DebuggingServer","PureProxy","MailmanProxy"] program = sys.argv[0] @@ -113,7 +113,7 @@ sys.exit(code) -class SMTPChannel(asynchat.async_chat): +class SMTPChannel(asyncio.StreamReaderProtocol): COMMAND = 0 DATA = 1 @@ -128,8 +128,13 @@ return self.command_size_limit def __init__(self, server, conn, addr, data_size_limit=DATA_SIZE_DEFAULT, - map=None, enable_SMTPUTF8=False, decode_data=None): - asynchat.async_chat.__init__(self, conn, map=map) + map=None, enable_SMTPUTF8=False, decode_data=None, *, + loop=None): + self.loop = loop if loop else asyncio.get_event_loop() + super().__init__( + asyncio.StreamReader(loop=self.loop), + client_connected_cb=self._client_connected_cb, + loop=self.loop) self.smtp_server = server self.conn = conn self.addr = addr @@ -160,18 +165,28 @@ self.seen_greeting = '' self.extended_smtp = False self.command_size_limits.clear() - self.fqdn = socket.getfqdn() - try: - self.peer = conn.getpeername() - except OSError as err: - # a race condition may occur if the other end is closing - # before we can get the peername - self.close() - if err.args[0] != errno.ENOTCONN: - raise - return + self.fqdn = socket.getfqdn() # XXX this blocks, fix it? + + def connection_made(self, transport): + super().connection_made(transport) + self.peer = transport.get_extra_info('peername') + self.transport = transport print('Peer:', repr(self.peer), file=DEBUGSTREAM) - self.push('220 %s %s' % (self.fqdn, __version__)) + # Process the client's requests. + self.connection_closed = False + self._handler_coroutine = self.loop.create_task(self._handle_client()) + + def _client_connected_cb(self, reader, writer): + # This is redundant since we subclass StreamReaderProtocol, but I like + # the shorter names. + self._reader = reader + self._writer = writer + + def connection_lost(self, exc): + print('Disconnect:', str(exc), file=DEBUGSTREAM) + self._connection_closed = True + super().connection_lost(exc) + yield from self._handler_coroutine def _set_post_data_state(self): """Reset state variables to their post-DATA state.""" @@ -179,8 +194,6 @@ self.mailfrom = None self.rcpttos = [] self.require_SMTPUTF8 = False - self.num_bytes = 0 - self.set_terminator(b'\r\n') def _set_rset_state(self): """Reset all state variables except the greeting.""" @@ -188,162 +201,28 @@ self.received_data = '' self.received_lines = [] + @asyncio.coroutine + def push(self, msg): + self._writer.write(bytes( + msg + '\r\n', 'utf-8' if self.require_SMTPUTF8 else 'ascii')) + yield from self._writer.drain() - # properties for backwards-compatibility - @property - def __server(self): - warn("Access to __server attribute on SMTPChannel is deprecated, " - "use 'smtp_server' instead", DeprecationWarning, 2) - return self.smtp_server - @__server.setter - def __server(self, value): - warn("Setting __server attribute on SMTPChannel is deprecated, " - "set 'smtp_server' instead", DeprecationWarning, 2) - self.smtp_server = value - - @property - def __line(self): - warn("Access to __line attribute on SMTPChannel is deprecated, " - "use 'received_lines' instead", DeprecationWarning, 2) - return self.received_lines - @__line.setter - def __line(self, value): - warn("Setting __line attribute on SMTPChannel is deprecated, " - "set 'received_lines' instead", DeprecationWarning, 2) - self.received_lines = value - - @property - def __state(self): - warn("Access to __state attribute on SMTPChannel is deprecated, " - "use 'smtp_state' instead", DeprecationWarning, 2) - return self.smtp_state - @__state.setter - def __state(self, value): - warn("Setting __state attribute on SMTPChannel is deprecated, " - "set 'smtp_state' instead", DeprecationWarning, 2) - self.smtp_state = value - - @property - def __greeting(self): - warn("Access to __greeting attribute on SMTPChannel is deprecated, " - "use 'seen_greeting' instead", DeprecationWarning, 2) - return self.seen_greeting - @__greeting.setter - def __greeting(self, value): - warn("Setting __greeting attribute on SMTPChannel is deprecated, " - "set 'seen_greeting' instead", DeprecationWarning, 2) - self.seen_greeting = value - - @property - def __mailfrom(self): - warn("Access to __mailfrom attribute on SMTPChannel is deprecated, " - "use 'mailfrom' instead", DeprecationWarning, 2) - return self.mailfrom - @__mailfrom.setter - def __mailfrom(self, value): - warn("Setting __mailfrom attribute on SMTPChannel is deprecated, " - "set 'mailfrom' instead", DeprecationWarning, 2) - self.mailfrom = value - - @property - def __rcpttos(self): - warn("Access to __rcpttos attribute on SMTPChannel is deprecated, " - "use 'rcpttos' instead", DeprecationWarning, 2) - return self.rcpttos - @__rcpttos.setter - def __rcpttos(self, value): - warn("Setting __rcpttos attribute on SMTPChannel is deprecated, " - "set 'rcpttos' instead", DeprecationWarning, 2) - self.rcpttos = value - - @property - def __data(self): - warn("Access to __data attribute on SMTPChannel is deprecated, " - "use 'received_data' instead", DeprecationWarning, 2) - return self.received_data - @__data.setter - def __data(self, value): - warn("Setting __data attribute on SMTPChannel is deprecated, " - "set 'received_data' instead", DeprecationWarning, 2) - self.received_data = value - - @property - def __fqdn(self): - warn("Access to __fqdn attribute on SMTPChannel is deprecated, " - "use 'fqdn' instead", DeprecationWarning, 2) - return self.fqdn - @__fqdn.setter - def __fqdn(self, value): - warn("Setting __fqdn attribute on SMTPChannel is deprecated, " - "set 'fqdn' instead", DeprecationWarning, 2) - self.fqdn = value - - @property - def __peer(self): - warn("Access to __peer attribute on SMTPChannel is deprecated, " - "use 'peer' instead", DeprecationWarning, 2) - return self.peer - @__peer.setter - def __peer(self, value): - warn("Setting __peer attribute on SMTPChannel is deprecated, " - "set 'peer' instead", DeprecationWarning, 2) - self.peer = value - - @property - def __conn(self): - warn("Access to __conn attribute on SMTPChannel is deprecated, " - "use 'conn' instead", DeprecationWarning, 2) - return self.conn - @__conn.setter - def __conn(self, value): - warn("Setting __conn attribute on SMTPChannel is deprecated, " - "set 'conn' instead", DeprecationWarning, 2) - self.conn = value - - @property - def __addr(self): - warn("Access to __addr attribute on SMTPChannel is deprecated, " - "use 'addr' instead", DeprecationWarning, 2) - return self.addr - @__addr.setter - def __addr(self, value): - warn("Setting __addr attribute on SMTPChannel is deprecated, " - "set 'addr' instead", DeprecationWarning, 2) - self.addr = value - - # Overrides base class for convenience. - def push(self, msg): - asynchat.async_chat.push(self, bytes( - msg + '\r\n', 'utf-8' if self.require_SMTPUTF8 else 'ascii')) - - # Implementation of base class abstract method - def collect_incoming_data(self, data): - limit = None - if self.smtp_state == self.COMMAND: - limit = self.max_command_size_limit - elif self.smtp_state == self.DATA: - limit = self.data_size_limit - if limit and self.num_bytes > limit: - return - elif limit: - self.num_bytes += len(data) - if self._decode_data: - self.received_lines.append(str(data, 'utf-8')) - else: - self.received_lines.append(data) - - # Implementation of base class abstract method - def found_terminator(self): - line = self._emptystring.join(self.received_lines) - print('Data:', repr(line), file=DEBUGSTREAM) - self.received_lines = [] - if self.smtp_state == self.COMMAND: - sz, self.num_bytes = self.num_bytes, 0 + @asyncio.coroutine + def _handle_client(self): + print('handling connection', file=DEBUGSTREAM) + yield from self.push('220 %s %s' % (self.fqdn, __version__)) + while not self.connection_closed: + # XXX Put the line limit stuff into the StreamReader? + line = yield from self._reader.readline() + # XXX this rstrip may not completely preserve old behavior. + line = line.decode('utf-8').rstrip('\r\n') + print('Data:', repr(line), file=DEBUGSTREAM) + if self.smtp_state != self.COMMAND: + yield from self.push('451 Internal confusion') + continue if not line: - self.push('500 Error: bad syntax') - return - if not self._decode_data: - line = str(line, 'utf-8') + yield from self.push('500 Error: bad syntax') + continue i = line.find(' ') if i < 0: command = line.upper() @@ -353,92 +232,72 @@ arg = line[i+1:].strip() max_sz = (self.command_size_limits[command] if self.extended_smtp else self.command_size_limit) - if sz > max_sz: - self.push('500 Error: line too long') - return + if len(line) > max_sz: + yield from self.push('500 Error: line too long') + continue method = getattr(self, 'smtp_' + command, None) if not method: - self.push('500 Error: command "%s" not recognized' % command) - return - method(arg) - return - else: - if self.smtp_state != self.DATA: - self.push('451 Internal confusion') - self.num_bytes = 0 - return - if self.data_size_limit and self.num_bytes > self.data_size_limit: - self.push('552 Error: Too much mail data') - self.num_bytes = 0 - return - # Remove extraneous carriage returns and de-transparency according - # to RFC 5321, Section 4.5.2. - data = [] - for text in line.split(self._linesep): - if text and text[0] == self._dotsep: - data.append(text[1:]) - else: - data.append(text) - self.received_data = self._newline.join(data) - args = (self.peer, self.mailfrom, self.rcpttos, self.received_data) - kwargs = {} - if not self._decode_data: - kwargs = { - 'mail_options': self.mail_options, - 'rcpt_options': self.rcpt_options, - } - status = self.smtp_server.process_message(*args, **kwargs) - self._set_post_data_state() - if not status: - self.push('250 OK') - else: - self.push(status) + yield from self.push('500 Error: command "%s" not recognized' % command) + continue + yield from method(arg) # SMTP and ESMTP commands + @asyncio.coroutine def smtp_HELO(self, arg): if not arg: - self.push('501 Syntax: HELO hostname') + yield from self.push('501 Syntax: HELO hostname') return # See issue #21783 for a discussion of this behavior. if self.seen_greeting: - self.push('503 Duplicate HELO/EHLO') + yield from self.push('503 Duplicate HELO/EHLO') return self._set_rset_state() self.seen_greeting = arg - self.push('250 %s' % self.fqdn) + yield from self.push('250 %s' % self.fqdn) + @asyncio.coroutine def smtp_EHLO(self, arg): if not arg: - self.push('501 Syntax: EHLO hostname') + yield from self.push('501 Syntax: EHLO hostname') return # See issue #21783 for a discussion of this behavior. if self.seen_greeting: - self.push('503 Duplicate HELO/EHLO') + yield from self.push('503 Duplicate HELO/EHLO') return self._set_rset_state() self.seen_greeting = arg self.extended_smtp = True - self.push('250-%s' % self.fqdn) + yield from self.push('250-%s' % self.fqdn) if self.data_size_limit: - self.push('250-SIZE %s' % self.data_size_limit) + yield from self.push('250-SIZE %s' % self.data_size_limit) self.command_size_limits['MAIL'] += 26 if not self._decode_data: - self.push('250-8BITMIME') + yield from self.push('250-8BITMIME') if self.enable_SMTPUTF8: - self.push('250-SMTPUTF8') + yield from self.push('250-SMTPUTF8') self.command_size_limits['MAIL'] += 10 - self.push('250 HELP') + yield from self.push('250 HELP') + @asyncio.coroutine def smtp_NOOP(self, arg): if arg: - self.push('501 Syntax: NOOP') + yield from self.push('501 Syntax: NOOP') else: - self.push('250 OK') + yield from self.push('250 OK') + @asyncio.coroutine def smtp_QUIT(self, arg): - # args is ignored - self.push('221 Bye') - self.close_when_done() + # arg is ignored + yield from self.push('221 Bye') + # XXX this close is probably not quite right. + yield from self.close() + + @asyncio.coroutine + def close(self): + # XXX this close is probably not quite right. + if self._writer: + self._writer.close() + self._connection_closed = True def _strip_command_keyword(self, keyword, arg): keylen = len(keyword) @@ -468,168 +327,210 @@ result[param] = value if eq else True return result + @asyncio.coroutine def smtp_HELP(self, arg): if arg: extended = ' [SP ]' lc_arg = arg.upper() if lc_arg == 'EHLO': - self.push('250 Syntax: EHLO hostname') + yield from self.push('250 Syntax: EHLO hostname') elif lc_arg == 'HELO': - self.push('250 Syntax: HELO hostname') + yield from self.push('250 Syntax: HELO hostname') elif lc_arg == 'MAIL': msg = '250 Syntax: MAIL FROM:
' if self.extended_smtp: msg += extended - self.push(msg) + yield from self.push(msg) elif lc_arg == 'RCPT': msg = '250 Syntax: RCPT TO:
' if self.extended_smtp: msg += extended - self.push(msg) + yield from self.push(msg) elif lc_arg == 'DATA': - self.push('250 Syntax: DATA') + yield from self.push('250 Syntax: DATA') elif lc_arg == 'RSET': - self.push('250 Syntax: RSET') + yield from self.push('250 Syntax: RSET') elif lc_arg == 'NOOP': - self.push('250 Syntax: NOOP') + yield from self.push('250 Syntax: NOOP') elif lc_arg == 'QUIT': - self.push('250 Syntax: QUIT') + yield from self.push('250 Syntax: QUIT') elif lc_arg == 'VRFY': - self.push('250 Syntax: VRFY
') + yield from self.push('250 Syntax: VRFY
') else: - self.push('501 Supported commands: EHLO HELO MAIL RCPT ' + yield from self.push('501 Supported commands: EHLO HELO MAIL RCPT ' 'DATA RSET NOOP QUIT VRFY') else: - self.push('250 Supported commands: EHLO HELO MAIL RCPT DATA ' + yield from self.push('250 Supported commands: EHLO HELO MAIL RCPT DATA ' 'RSET NOOP QUIT VRFY') + @asyncio.coroutine def smtp_VRFY(self, arg): if arg: address, params = self._getaddr(arg) if address: - self.push('252 Cannot VRFY user, but will accept message ' + yield from self.push('252 Cannot VRFY user, but will accept message ' 'and attempt delivery') else: - self.push('502 Could not VRFY %s' % arg) + yield from self.push('502 Could not VRFY %s' % arg) else: - self.push('501 Syntax: VRFY
') + yield from self.push('501 Syntax: VRFY
') + @asyncio.coroutine def smtp_MAIL(self, arg): if not self.seen_greeting: - self.push('503 Error: send HELO first') + yield from self.push('503 Error: send HELO first') return print('===> MAIL', arg, file=DEBUGSTREAM) syntaxerr = '501 Syntax: MAIL FROM:
' if self.extended_smtp: syntaxerr += ' [SP ]' if arg is None: - self.push(syntaxerr) + yield from self.push(syntaxerr) return arg = self._strip_command_keyword('FROM:', arg) address, params = self._getaddr(arg) if not address: - self.push(syntaxerr) + yield from self.push(syntaxerr) return if not self.extended_smtp and params: - self.push(syntaxerr) + yield from self.push(syntaxerr) return if self.mailfrom: - self.push('503 Error: nested MAIL command') + yield from self.push('503 Error: nested MAIL command') return self.mail_options = params.upper().split() params = self._getparams(self.mail_options) if params is None: - self.push(syntaxerr) + yield from self.push(syntaxerr) return if not self._decode_data: body = params.pop('BODY', '7BIT') if body not in ['7BIT', '8BITMIME']: - self.push('501 Error: BODY can only be one of 7BIT, 8BITMIME') + yield from self.push('501 Error: BODY can only be one of 7BIT, 8BITMIME') return if self.enable_SMTPUTF8: smtputf8 = params.pop('SMTPUTF8', False) if smtputf8 is True: self.require_SMTPUTF8 = True elif smtputf8 is not False: - self.push('501 Error: SMTPUTF8 takes no arguments') + yield from self.push('501 Error: SMTPUTF8 takes no arguments') return size = params.pop('SIZE', None) if size: if not size.isdigit(): - self.push(syntaxerr) + yield from self.push(syntaxerr) return elif self.data_size_limit and int(size) > self.data_size_limit: - self.push('552 Error: message size exceeds fixed maximum message size') + yield from self.push('552 Error: message size exceeds fixed maximum message size') return if len(params.keys()) > 0: - self.push('555 MAIL FROM parameters not recognized or not implemented') + yield from self.push('555 MAIL FROM parameters not recognized or not implemented') return self.mailfrom = address print('sender:', self.mailfrom, file=DEBUGSTREAM) - self.push('250 OK') + yield from self.push('250 OK') + @asyncio.coroutine def smtp_RCPT(self, arg): if not self.seen_greeting: - self.push('503 Error: send HELO first'); + yield from self.push('503 Error: send HELO first'); return print('===> RCPT', arg, file=DEBUGSTREAM) if not self.mailfrom: - self.push('503 Error: need MAIL command') + yield from self.push('503 Error: need MAIL command') return syntaxerr = '501 Syntax: RCPT TO:
' if self.extended_smtp: syntaxerr += ' [SP ]' if arg is None: - self.push(syntaxerr) + yield from self.push(syntaxerr) return arg = self._strip_command_keyword('TO:', arg) address, params = self._getaddr(arg) if not address: - self.push(syntaxerr) + yield from self.push(syntaxerr) return if not self.extended_smtp and params: - self.push(syntaxerr) + yield from self.push(syntaxerr) return self.rcpt_options = params.upper().split() params = self._getparams(self.rcpt_options) if params is None: - self.push(syntaxerr) + yield from self.push(syntaxerr) return # XXX currently there are no options we recognize. if len(params.keys()) > 0: - self.push('555 RCPT TO parameters not recognized or not implemented') + yield from self.push('555 RCPT TO parameters not recognized or not implemented') return self.rcpttos.append(address) print('recips:', self.rcpttos, file=DEBUGSTREAM) - self.push('250 OK') + yield from self.push('250 OK') + @asyncio.coroutine def smtp_RSET(self, arg): if arg: - self.push('501 Syntax: RSET') + yield from self.push('501 Syntax: RSET') return self._set_rset_state() - self.push('250 OK') + yield from self.push('250 OK') + @asyncio.coroutine def smtp_DATA(self, arg): if not self.seen_greeting: - self.push('503 Error: send HELO first'); + yield from self.push('503 Error: send HELO first'); return if not self.rcpttos: - self.push('503 Error: need RCPT command') + yield from self.push('503 Error: need RCPT command') return if arg: - self.push('501 Syntax: DATA') + yield from self.push('501 Syntax: DATA') return self.smtp_state = self.DATA - self.set_terminator(b'\r\n.\r\n') - self.push('354 End data with .') + yield from self.push('354 End data with .') + data = [] + self.num_bytes = 0 + while not self.connection_closed: + line = yield from self._reader.readline() + if line == b'.\r\n': + break + self.num_bytes += len(line) + if self.data_size_limit and self.num_bytes > self.data_size_limit: + yield from self.push('552 Error: Too much mail data') + # XXX this rstrip may not exactly preserve the old behavior + line = line.rstrip(b'\r\n') + if self._decode_data: + data.append(line.decode('utf-8')) + else: + data.append(line) + # Remove extraneous carriage returns and de-transparency + # according to RFC 5321, Section 4.5.2. + for i in range(len(data)): + text = data[i] + if text and text[0] == self._dotsep: + data[i] = text[1:] + self.received_data = self._newline.join(data) + args = (self.peer, self.mailfrom, self.rcpttos, + self.received_data) + kwargs = {} + if not self._decode_data: + kwargs = { + 'mail_options': self.mail_options, + 'rcpt_options': self.rcpt_options, + } + status = self.smtp_server.process_message(*args, **kwargs) + self._set_post_data_state() + if not status: + yield from self.push('250 OK') + else: + yield from self.push(status) # Commands that have not been implemented + @asyncio.coroutine def smtp_EXPN(self, arg): - self.push('502 EXPN not implemented') + yield from self.push('502 EXPN not implemented') -class SMTPServer(asyncore.dispatcher): +class SMTPServer: # SMTPChannel class to use for managing client connections channel_class = SMTPChannel @@ -652,32 +553,10 @@ DeprecationWarning, 2) decode_data = True self._decode_data = decode_data - asyncore.dispatcher.__init__(self, map=map) - try: - gai_results = socket.getaddrinfo(*localaddr, - type=socket.SOCK_STREAM) - self.create_socket(gai_results[0][0], gai_results[0][1]) - # try to re-use a server port if possible - self.set_reuse_addr() - self.bind(localaddr) - self.listen(5) - except: - self.close() - raise - else: - print('%s started at %s\n\tLocal addr: %s\n\tRemote addr:%s' % ( - self.__class__.__name__, time.ctime(time.time()), - localaddr, remoteaddr), file=DEBUGSTREAM) - def handle_accepted(self, conn, addr): - print('Incoming connection from %s' % repr(addr), file=DEBUGSTREAM) - channel = self.channel_class(self, - conn, - addr, - self.data_size_limit, - self._map, - self.enable_SMTPUTF8, - self._decode_data) + print('%s created at %s\n\tLocal addr: %s\n\tRemote addr:%s' % ( + self.__class__.__name__, time.ctime(time.time()), + localaddr, remoteaddr), file=DEBUGSTREAM) # API for "doing something useful with the message" def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): @@ -971,7 +850,19 @@ except PermissionError: print('Cannot setuid "nobody"; try running with -n option.', file=sys.stderr) sys.exit(1) - try: - asyncore.loop() - except KeyboardInterrupt: - pass + + def handler(): + return class_.channel_class(proxy, 'FIXME', 'FIXME', options.size_limit, + None, options.enable_SMTPUTF8) + + import logging + logging.basicConfig(level=logging.DEBUG) + loop = asyncio.get_event_loop() + server = loop.run_until_complete( + loop.create_server(handler, options.localhost, options.localport)) + loop.add_signal_handler(signal.SIGINT, loop.stop) + loop.run_forever() + server.close() + # XXX This cleanup is probably incomplete. + loop.run_until_complete(server.wait_closed()) + loop.close()