diff -r c43362d35d8d Lib/smtpd.py --- a/Lib/smtpd.py Fri Jul 04 17:00:25 2014 -0700 +++ b/Lib/smtpd.py Mon Jul 07 22:32:00 2014 +0200 @@ -81,8 +81,10 @@ import socket import asyncore import asynchat +import base64 import collections from warnings import warn +from smtplib import SMTPResponseException from email._header_value_parser import get_addr_spec, get_angle_addr __all__ = ["SMTPServer","DebuggingServer","PureProxy","MailmanProxy"] @@ -112,6 +114,7 @@ class SMTPChannel(asynchat.async_chat): COMMAND = 0 DATA = 1 + AUTH = 2 command_size_limit = 512 command_size_limits = collections.defaultdict(lambda x=command_size_limit: x) @@ -151,6 +154,7 @@ self.received_data = '' self.fqdn = socket.getfqdn() self.num_bytes = 0 + self.user = None try: self.peer = conn.getpeername() except OSError as err: @@ -338,6 +342,13 @@ return method(arg) return + elif self.smtp_state == self.AUTH: + try: + self.auth_object(line) + except SMTPResponseException as e: + self.smtp_state = self.COMMAND + self.push('%s %s' % (e.smtp_code, e.smtp_error)) + return else: if self.smtp_state != self.DATA: self.push('451 Internal confusion') @@ -466,6 +477,90 @@ self.push('250 Supported commands: EHLO HELO MAIL RCPT DATA ' 'RSET NOOP QUIT VRFY') + def smtp_AUTH(self, arg): + if not self.seen_greeting: + self.push('503 Error: send HELO first'); + return + if self.user is not None: + self.push( + '503 Bad sequence of commands: already authenticated') + return + args = arg.split() + if len(args) not in [1, 2]: + self.push('501 Syntax: AUTH []') + return + auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') + try: + self.auth_object = getattr(self, auth_object_name) + except AttributeError: + self.push('504 Command parameter not implemented: ' + 'unsupported authentication mechanism') + return + self.smtp_state = self.AUTH + if len(args) == 1: + self.auth_object() + return + else: + try: + self.auth_object(args[1]) + except SMTPResponseException as e: + self.smtp_state = self.COMMAND + self.push('%s %s' % (e.smtp_code, e.smtp_error)) + return + + def _verify_user_credentials(self, user, password): + """Overwrite this method to accept authentication attempts + conditionally. + This method gets `user` and `password` as strings and should return a + boolean. It may raise an smtpd.SMTPResponseException on errors (which + will be sent to the client, so be careful).""" + return False + + def _authenticate(self, user, password): + """Translates the output of `_verify_user_credentials` to SMTP + responses, sets `self.user` on success and resets the internal + state.""" + if self._verify_user_credentials(user, password): + self.user = user + self.push('235 Authentication Succeeded') + else: + self.push('535 Authentication credentials invalid') + self.smtp_state = self.COMMAND + + def _decode(self, string): + """Decode string containing base64 data to unicode literal.""" + try: + return base64.decodebytes(string.encode('ascii')).decode('utf-8') + except base64.binascii.Error as e: + raise SMTPResponseException( + 501, 'Encoding error: %s' % str(e)) + + def _auth_plain(self, arg=None): + """AUTH helper processing PLAIN authentication.""" + if arg is None: + self.push('334 ') + else: + try: + user, password = self._decode(arg).strip('\0').split('\0') + except ValueError as e: + self.push('535 Splitting input into user and password failed') + return + self._authenticate(user, password) + + def _auth_login(self, arg=None): + """AUTH helper processing LOGIN authentication.""" + if arg is None: + # base64 encoded 'Username:' + self.push('334 VXNlcm5hbWU6') + elif not hasattr(self, '_auth_login_user'): + self._auth_login_user = self._decode(arg) + # base64 encoded 'Password:' + self.push('334 UGFzc3dvcmQ6') + else: + password = self._decode(arg) + self._authenticate(self._auth_login_user, password) + del self._auth_login_user + def smtp_VRFY(self, arg): if arg: address, params = self._getaddr(arg) diff -r c43362d35d8d Lib/test/test_smtpd.py --- a/Lib/test/test_smtpd.py Fri Jul 04 17:00:25 2014 -0700 +++ b/Lib/test/test_smtpd.py Mon Jul 07 22:32:00 2014 +0200 @@ -180,6 +180,67 @@ self.assertEqual(self.channel.socket.last, b'501 Syntax: MAIL FROM:
\r\n') + def test_AUTH_requires_greeting(self): + self.write_line(b'AUTH PLAIN') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '503 ') + self.write_line(b'EHLO beispiel') + # TODO: implement AUTH announcement and test it here. + self.write_line(b'AUTH PLAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '535 ') + + def test_AUTH_can_be_accepted_conditionally(self): + self.channel._verify_user_credentials = lambda u, p: u == p + self.write_line(b'EHLO beispiel') + # base64 encoded '\0no\0access' + self.write_line(b'AUTH PLAIN AG5vAGFjY2Vzcw==') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '535 ') + # base64 encoded '\0hello\0hello' + self.write_line(b'AUTH PLAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '235 ') + self.assertEqual(self.channel.user, 'hallo') + self.assertEqual(self.channel.smtp_state, self.channel.COMMAND) + + def test_AUTH_with_more_then_one_message(self): + self.channel._verify_user_credentials = lambda u, p: True + self.write_line(b'EHLO beispiel') + self.write_line(b'AUTH PLAIN') + self.assertEqual(self.channel.socket.last, b'334 \r\n') + self.assertEqual(self.channel.smtp_state, self.channel.AUTH) + self.write_line(b'AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '235 ') + + def test_AUTH_LOGIN(self): + self.channel._verify_user_credentials = lambda u, p: True + self.write_line(b'EHLO beispiel') + self.write_line(b'AUTH LOGIN') + self.assertEqual(self.channel.socket.last, b'334 VXNlcm5hbWU6\r\n') + self.write_line(b'aGFsbG8=') + self.assertEqual(self.channel.socket.last, b'334 UGFzc3dvcmQ6\r\n') + self.write_line(b'aGFsbG8=') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '235 ') + + def test_AUTH_exceptions(self): + def verify(user, password): + raise smtpd.SMTPResponseException(404, 'Code not found') + self.channel._verify_user_credentials = verify + self.write_line(b'EHLO beispiel') + self.write_line(b'AUTH PLAIN hallo') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '501 ') + self.write_line(b'AUTH PAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '504 ') + self.write_line(b'AUTH PLAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '404 ') + + def test_multiple_AUTH_denied(self): + self.channel._verify_user_credentials = lambda u, p: True + self.write_line(b'EHLO beispiel') + self.write_line(b'AUTH PLAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '235 ') + self.assertEqual(self.channel.user, 'hallo') + self.assertEqual(self.channel.smtp_state, self.channel.COMMAND) + self.write_line(b'AUTH PLAIN AGhhbGxvAGhhbGxv') + self.assertEqual(self.channel.socket.last.decode('ascii')[:4], '503 ') + def test_MAIL_allows_space_after_colon(self): self.write_line(b'HELO example') self.write_line(b'MAIL from: ') diff -r c43362d35d8d Lib/test/test_smtplib.py --- a/Lib/test/test_smtplib.py Fri Jul 04 17:00:25 2014 -0700 +++ b/Lib/test/test_smtplib.py Mon Jul 07 22:32:00 2014 +0200 @@ -623,12 +623,23 @@ rcpt_count = 0 rset_count = 0 disconnect = 0 + user_dict = {} def __init__(self, extra_features, *args, **kw): self._extrafeatures = ''.join( [ "250-{0}\r\n".format(x) for x in extra_features ]) super(SimSMTPChannel, self).__init__(*args, **kw) + def _verify_user_credentials(self, user, password): + if (user, password) == sim_auth: + return True + return False + + def _authenticate(self, user, password): + super(SimSMTPChannel, self)._authenticate(user, password) + if self._verify_user_credentials(user, password): + self.smtp_server.sim_auth = (user, password) + def smtp_EHLO(self, arg): resp = ('250-testhost\r\n' '250-EXPN\r\n' @@ -662,17 +673,10 @@ def smtp_AUTH(self, arg): mech = arg.strip().lower() - if mech=='cram-md5': + if mech == 'cram-md5': self.push('334 {}'.format(sim_cram_md5_challenge)) - elif mech not in sim_auth_credentials: - self.push('504 auth type unimplemented') - return - elif mech=='plain': - self.push('334 ') - elif mech=='login': - self.push('334 ') else: - self.push('550 No access for you!') + super(SimSMTPChannel, self).smtp_AUTH(arg) def smtp_QUIT(self, arg): if self.quit_response is None: @@ -829,17 +833,15 @@ def testAUTH_PLAIN(self): self.serv.add_feature("AUTH PLAIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_plain, str(err)) + smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(self.serv.sim_auth, sim_auth) smtp.close() def testAUTH_LOGIN(self): self.serv.add_feature("AUTH LOGIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_user, str(err)) + smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(self.serv.sim_auth, sim_auth) smtp.close() def testAUTH_CRAM_MD5(self): @@ -855,7 +857,9 @@ # Test that multiple authentication methods are tried. self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) + try: + smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(self.serv.sim_auth, sim_auth) except smtplib.SMTPAuthenticationError as err: self.assertIn(sim_auth_login_user, str(err)) smtp.close() @@ -872,6 +876,8 @@ for mechanism, method in supported.items(): try: smtp.auth(mechanism, method) except smtplib.SMTPAuthenticationError as err: + if err.smtp_code == 503: + break self.assertIn(sim_auth_credentials[mechanism.lower()].upper(), str(err)) smtp.close()