diff --git a/Doc/library/email.contentmanager.rst b/Doc/library/email.contentmanager.rst index 376ddf3..701a2f8 100644 --- a/Doc/library/email.contentmanager.rst +++ b/Doc/library/email.contentmanager.rst @@ -199,7 +199,7 @@ when creating MIME parts from content when using the :class:`ObjectManager`. ``content_manager`` specified by the current :mod:`~email.policy`. -.. class ContentManager() +.. class:: ContentManager() Base class for content managers. Provides the standard registry mechanisms that map MIME content types to other representations, as well as the ``get_content`` @@ -250,3 +250,109 @@ when creating MIME parts from content when using the :class:`ObjectManager`. Record *handler* as the function to call when an object of a type matching *typekey* is passed to :meth:`set_content`. For the possible values of *typekey*, see :meth:`set_content`. + + +Content Manager Instances +------------------------- + + +.. data:: raw_data_manager + + This content manager provides only a minimum interface beyond that provided + by :class:`~email.message.Message` itself: it deals only with text, raw + byte strings, and :class:`~email.message.Message` objects. Nevertheless, it + provides significant advantages compared to the base API: ``get_content`` on + a text part will return a unicode string without the application needing to + manually decode it, ``set_content`` provides a rich set of options for + controlling the headers added to a part and controlling the content transfer + encoding, and it enables the use of the various ``add_`` methods, thereby + simplifying the creation of multipart messages. + + .. method:: get_content(msg, errors='replace') + + Return the payload of the part as either a string (for ``text`` parts), a + :class:`~email.message.MIMEMessage` object (for ``message/rfc822`` + parts), or a ``bytes`` object (for all other non-multipart types). Raise + a :exc:`KeyError` if called on a ``multipart``. If the part is a + ``text`` part and *errors* is specified, use it as the error handler when + decoding the payload to unicode. The default error handler is + ``replace``. + + .. method:: set_content(msg, <'str'>, subtype="plain", charset='utf-8' + cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'bytes'>, maintype, subtype, cte="base64", + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'Message'>, cte=None, + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'list'>, subtype='mixed', + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + + Add headers and payload to *msg*: + + If the *msg* object doesn not have a :mailheader:`MIME-Version` + header, add one. + + Add a :mailheader:`Content-Type` header with a ``maintype/subtype`` + value. For strings, set the MIME ``maintype`` to text, and set the + subtype to *subtype* if it is specified, or ``plain`` if it is not. For + :class:`~email.message.Message` objects, set the maintype to + ``message``, and set the subtype to *subtype* if it is specified or + ``rfc822`` if it is not. If *subtype* is ``partial``, raise an error + (use ``bytes`` objects to construct ``message/partial`` parts). For + ``bytes``, use the specified *maintype* and *subtype*, or raise a + :exc:`TypeError` if they are not specified. For *<'list'>*, which + should be a list of :class:`~email.message.Message` objects, set the + ``maintype`` to ``multipart``, and the ``subtype`` to *subtype* if it is + specified, and ``mixed`` if it is not. If the message parts in the + *<'list'>* have :mailheader:`MIME-Version` headers, remove them. + + If *charset* is provided (which is valid only for ``str``), encode the + string to bytes using the specified character set. The default is + ``utf-8``. If the specified *charset* is a known alias for a standard + MIME charset name, use the standard charset instead. + + If *cte* is set, encode the payload using the specified content transfer + encoding, and set the :mailheader:`Content-Transfer-Endcoding` header to + that value. For ``str`` objects, if it is not set use heuristics to + determine the most compact encoding. Possible values for *cte* are + ``quoted-printable``, ``base64``, ``7bit``, ``8bit``, and ``binary``. + If the input cannot be encoded in the specified encoding (eg: ``7bit``), + raise a :exc:`ValueError`. For :class:`~email.message.Message`, per + :rfc:2046, raise an error if a *cte* of ``quoted-printable`` or + ``base64`` is requested for *subtype* ``rfc822``, and for any *cte* + other than ``7bit`` for *subtype* ``external-body``. For + ``message/rfc822``, use ``8bit`` if *cte* is not specified. For all + other values of *subtype*, use ``7bit``. + + .. note:: A *cte* of ``binary`` does not actually work correctly yet. + The ``Message`` object as modified by ``set_content`` is correct, but + :class:`~email.generator.BytesGenerator` does not serialize it + correctly. + + If *disposition* is set, use it as the value of the + :mailheader:`Content-Disposition` header. If not specified, and + *filename* is specified, add the header with the value ``attachment``. + If it is not specified and *filename* is also not specified, do not add + the header. The only valie values for *disposition* are ``attachment`` + and ``inline``. + + If *filename* is specified, use it as the value of the ``filename`` + parameter of the :mailheader:`Content-Disposition` header. There is no + default. + + If *cid* is specified, add a :mailheader:`Content-ID` header with + *cid* as its value. + + If *params* is specified, iterate its ``items`` method and use the + resulting ``(key, value)`` pairs to set additional paramters on the + :mailheader:`Content-Type` header. + + If *headers* is specified and is a list of strings of the form + ``headername: headervalue`` or a list of ``header`` objects + (distinguised from strings by having a ``name`` attribute), add the + headers to *msg*. diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst index 4a34c36..902ac7b 100644 --- a/Doc/library/email.message.rst +++ b/Doc/library/email.message.rst @@ -465,7 +465,8 @@ Here are the methods of the :class:`Message` class: to ``False``. - .. method:: set_param(param, value, header='Content-Type', requote=True, charset=None, language='') + .. method:: set_param(param, value, header='Content-Type', requote=True, + charset=None, language='', replace=False) Set a parameter in the :mailheader:`Content-Type` header. If the parameter already exists in the header, its value will be replaced with @@ -482,6 +483,10 @@ Here are the methods of the :class:`Message` class: language, defaulting to the empty string. Both *charset* and *language* should be strings. + If *replace* is ``False`` (the default) the header is moved to the + end of the list of headers. If *replace* is ``True``, the header + will be updated in place. + .. method:: del_param(param, header='content-type', requote=True) diff --git a/Lib/email/contentmanager.py b/Lib/email/contentmanager.py index 6ab06c3..09e0537 100644 --- a/Lib/email/contentmanager.py +++ b/Lib/email/contentmanager.py @@ -1,3 +1,9 @@ +import binascii +import email.charset +import email.message +import email.errors +from email import quoprimime + class ContentManager: def __init__(self): @@ -43,5 +49,197 @@ class ContentManager: raise KeyError(full_path_for_error) -class ObjectManager(ContentManager): - pass +raw_data_manager = ContentManager() + + +def get_text_content(msg, errors='replace'): + content = msg.get_payload(decode=True) + charset = msg.get_param('charset', 'ASCII') + return content.decode(charset, errors=errors) +raw_data_manager.add_get_handler('text', get_text_content) + + +def get_non_text_content(msg): + return msg.get_payload(decode=True) +for maintype in 'audio image video application'.split(): + raw_data_manager.add_get_handler(maintype, get_non_text_content) + + +def get_message_content(msg): + return msg.get_payload(0) +for subtype in 'rfc822 external-body'.split(): + raw_data_manager.add_get_handler('message/'+subtype, get_message_content) + + +def get_and_fixup_unknown_message_content(msg): + # If we don't understand a message subtype, we are supposed to treat it as + # if it were application/octet-stream, per + # tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that, + # so do our best to fix things up. Note that it is *not* appropriate to + # model message/partial content as Message objects, so they are handled + # here as well. (How to reassemble them is out of scope for this comment :) + return bytes(msg.get_payload(0)) +raw_data_manager.add_get_handler('message', + get_and_fixup_unknown_message_content) + + +def _prepare_set(msg, maintype, subtype, headers): + if 'mime-version' not in msg: + msg['MIME-Version'] = '1.0' + msg['Content-Type'] = '/'.join((maintype, subtype)) + if headers: + if not hasattr(headers[0], 'name'): + mp = msg.policy + headers = [mp.header_factory(*mp.header_source_parse([header])) + for header in headers] + try: + for header in headers: + if header.defects: + raise header.defects[0] + msg[header.name] = header + except email.errors.HeaderDefect as exc: + raise ValueError("Invalid header: {}".format( + header.fold(policy=msg.policy))) from exc + + +def _finalize_set(msg, disposition, filename, cid, params, headers): + if disposition is None and filename is not None: + disposition = 'attachment' + if disposition is not None: + msg['Content-Disposition'] = disposition + if filename is not None: + msg.set_param('filename', + filename, + header='Content-Disposition', + replace=True) + if cid is not None: + msg['Content-ID'] = cid + if params is not None: + for key, value in params.items(): + msg.set_param(key, value) + + +# XXX: This is a cleaned-up version of base64mime.body_encode. It would +# be nice to drop both this and quoprimime.body_encode in favor of +# enhanced binascii routines that accepted a max_line_length parameter. +def _encode_base64(data, max_line_length): + encoded_lines = [] + unencoded_bytes_per_line = max_line_length * 3 // 4 + for i in range(0, len(data), unencoded_bytes_per_line): + thisline = data[i:i+unencoded_bytes_per_line] + encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii')) + return ''.join(encoded_lines) + + +def _encode_text(string, charset, cte, policy): + lines = string.encode(charset).splitlines() + linesep = policy.linesep.encode('ascii') + def embeded_body(lines): return linesep.join(lines) + linesep + def normal_body(lines): return b'\n'.join(lines) + b'\n' + if cte==None: + # Use heuristics to decide on the "best" encoding. + try: + return '7bit', normal_body(lines).decode('ascii') + except UnicodeDecodeError: + pass + if (policy.cte_type == '8bit' and + max((len(x) for x in lines)) <= policy.max_line_length): + return '8bit', normal_body(lines).decode('ascii', 'surrogateescape') + sniff = embeded_body(lines[:10]) + sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'), + policy.max_line_length) + sniff_base64 = binascii.b2a_base64(sniff) + # This is a little unfair to qp; it includes lineseps and base64 doesn't. + if len(sniff_qp) > len(sniff_base64): + cte = 'base64' + else: + cte = 'quoted-printable' + if len(lines) <= 10: + return cte, sniff_qp + if cte == '7bit': + data = normal_body(lines).decode('ascii') + elif cte == '8bit': + data = normal_body(lines).decode('ascii', 'surrogateescape') + elif cte == 'quoted-printable': + data = quoprimime.body_encode(normal_body(lines).decode('latin-1'), + policy.max_line_length) + elif cte == 'base64': + data = _encode_base64(embeded_body(lines), policy.max_line_length) + else: + raise ValueError("Unknown content transfer encoding {}".format(cte)) + return cte, data + + +def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, 'text', subtype, headers) + cte, payload = _encode_text(string, charset, cte, msg.policy) + msg.set_payload(payload) + msg.set_param('charset', + email.charset.ALIASES.get(charset, charset), + replace=True) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(str, set_text_content) + + +def set_message_content(msg, message, subtype="rfc822", cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + if subtype == 'partial': + raise ValueError("message/partial is not supported for Message objects") + if subtype == 'rfc822': + if cte not in (None, '7bit', '8bit', 'binary'): + # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate. + raise ValueError( + "message/rfc822 parts do not support cte={}".format(cte)) + # 8bit will get coerced on serialization if policy.cte_type='7bit'. We + # may end up claiming 8bit when it isn't needed, but the only negative + # result of that should be a gateway that needs to coerce to 7bit + # having to look through the whole embedded message to discover whether + # or not it actually has to do anything. + cte = '8bit' if cte is None else cte + elif subtype == 'external-body': + if cte not in (None, '7bit'): + # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate. + raise ValueError( + "message/external-body parts do not support cte={}".format(cte)) + cte = '7bit' + elif cte is None: + # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future + # subtypes should be restricted to 7bit, so assume that. + cte = '7bit' + _prepare_set(msg, 'message', subtype, headers) + msg.set_payload([message]) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(email.message.Message, set_message_content) + + +def set_bytes_content(msg, data, maintype, subtype, cte='base64', + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, maintype, subtype, headers) + if cte == 'base64': + data = _encode_base64(data, max_line_length=msg.policy.max_line_length) + elif cte == 'quoted-printable': + # XXX: quoprimime.body_encode won't encode newline characters in data, + # so we can't use it. This means max_line_length is ignored. Another + # bug to fix later. (Note: encoders.quopri is broken on line ends. + data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True) + data = data.decode('ascii') + elif cte == '7bit': + # Make sure it really is only ASCII. The early warning here seems + # worth the overhead...if you care write your own content manager :). + data.encode('ascii') + elif cte in ('8bit', 'binary'): + data = data.decode('ascii', 'surrogateescape') + msg.set_payload(data) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +for typ in (bytes, bytearray, memoryview): + raw_data_manager.add_set_handler(typ, set_bytes_content) + + +object_manager = ContentManager() diff --git a/Lib/email/message.py b/Lib/email/message.py index cf42e27..6493dd1 100644 --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -675,7 +675,7 @@ class Message: return failobj def set_param(self, param, value, header='Content-Type', requote=True, - charset=None, language=''): + charset=None, language='', replace=False): """Set a parameter in the Content-Type header. If the parameter already exists in the header, its value will be @@ -719,8 +719,11 @@ class Message: else: ctype = SEMISPACE.join([ctype, append_param]) if ctype != self.get(header): - del self[header] - self[header] = ctype + if replace: + self.replace_header(header, ctype) + else: + del self[header] + self[header] = ctype def del_param(self, param, header='content-type', requote=True): """Remove the given parameter completely from the Content-Type header. @@ -961,11 +964,9 @@ class MIMEMessage(Message): content_manager = self.policy.content_manager return content_manager.get_content(self, *args, **kw) - def set_content(self, *args, headers=[], content_manager=None, **kw): + def set_content(self, *args, content_manager=None, **kw): if content_manager is None: content_manager = self.policy.content_manager - for header in headers: - self.add_header(header.name, header) content_manager.set_content(self, *args, **kw) def _make_multipart(self, subtype, disallowed_subtypes, boundary): diff --git a/Lib/email/policy.py b/Lib/email/policy.py index 0e53feb..f0b20f4 100644 --- a/Lib/email/policy.py +++ b/Lib/email/policy.py @@ -5,7 +5,7 @@ code that adds all the email6 features. from email._policybase import Policy, Compat32, compat32, _extend_docstrings from email.utils import _has_surrogates from email.headerregistry import HeaderRegistry as HeaderRegistry -from email.contentmanager import ObjectManager +from email.contentmanager import raw_data_manager __all__ = [ 'Compat32', @@ -66,13 +66,15 @@ class EmailPolicy(Policy): it calls the corresponding method of this object, passing it the message object as its first argument, and any arguments or keywords that were passed to - it as additional arguments. + it as additional arguments. The default + content_manager is + :data:`~email.contentmanager.raw_data_manager`. """ refold_source = 'long' header_factory = HeaderRegistry() - content_manager = ObjectManager() + content_manager = raw_data_manager def __init__(self, **kw): # Ensure that each new instance gets a unique header factory diff --git a/Lib/email/utils.py b/Lib/email/utils.py index b3b42bb..25b0d56 100644 --- a/Lib/email/utils.py +++ b/Lib/email/utils.py @@ -68,9 +68,13 @@ def _has_surrogates(s): # How to deal with a string containing bytes before handing it to the # application through the 'normal' interface. def _sanitize(string): - # Turn any escaped bytes into unicode 'unknown' char. - original_bytes = string.encode('ascii', 'surrogateescape') - return original_bytes.decode('ascii', 'replace') + # Turn any escaped bytes into unicode 'unknown' char. If the escaped + # bytes happen to be utf-8 they will instead get decoded, even if they + # were invalid in the charset the source was supposed to be in. This + # seems like it is not a bad thing; a defect was still registered. + original_bytes = string.encode('utf-8', 'surrogateescape') + return original_bytes.decode('utf-8', 'replace') + # Helpers diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index df1ae9b..d0a43e0 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -2,6 +2,7 @@ import os import sys import unittest import test.support +import collections import email from email.message import Message from email._policybase import compat32 @@ -63,6 +64,16 @@ class TestEmailBase(unittest.TestCase): message = self.message return email.message_from_string(string, message, policy=policy) + def _bytes_msg(self, bytestring, message=None, policy=None): + if policy is None: + policy = self.policy + if message is None: + message = self.message + return email.message_from_bytes(bytestring, message, policy=policy) + + def _make_message(self): + return self.message(policy=self.policy) + def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] @@ -127,7 +138,7 @@ def parameterize(cls): """ paramdicts = {} - testers = [] + testers = collections.defaultdict(list) for name, attr in cls.__dict__.items(): if name.endswith('_params'): if not hasattr(attr, 'keys'): @@ -140,8 +151,14 @@ def parameterize(cls): attr = d paramdicts[name[:-7] + '_as_'] = attr if '_as_' in name: - testers.append(name.split('_as_')[0] + '_as_') + testers[name.split('_as_')[0] + '_as_'].append(name) testfuncs = {} + for name in paramdicts: + if name not in testers: + raise ValueError("No tester found for {}".format(name)) + for name in testers: + if name not in paramdicts: + raise ValueError("No params found for {}".format(name)) for name, attr in cls.__dict__.items(): for paramsname, paramsdict in list(paramdicts.items()): if name.startswith(paramsname): @@ -152,14 +169,6 @@ def parameterize(cls): testname = testnameroot + '_' + paramname test.__name__ = testname testfuncs[testname] = test - del paramdicts[paramsname] - testers.remove(paramsname) - if paramdicts: - raise ValueError("No tester found for {}".format( - ', '.join(paramdicts.keys()))) - if testers: - raise ValueError("No params found for {}".format( - ', '.join(testers))) for key, value in testfuncs.items(): setattr(cls, key, value) return cls diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py index 444be18..1409322 100644 --- a/Lib/test/test_email/test_contentmanager.py +++ b/Lib/test/test_email/test_contentmanager.py @@ -1,8 +1,9 @@ import unittest from test.test_email import TestEmailBase, parameterize -from email.contentmanager import ContentManager +import textwrap from email import policy from email.message import MIMEMessage +from email.contentmanager import ContentManager, raw_data_manager @parameterize @@ -12,70 +13,770 @@ class TestContentManager(TestEmailBase): message = MIMEMessage get_key_params = { - 'full_type': ('text/plain',), - 'maintype_only': ('text',), - 'null_key': ('',), + 'full_type': (1, 'text/plain',), + 'maintype_only': (2, 'text',), + 'null_key': (3, '',), } - def get_key_as_get_content_key(self, key): + def get_key_as_get_content_key(self, order, key): def foo_getter(msg, foo=None): bar = msg['X-Bar-Header'] return foo, bar cm = ContentManager() cm.add_get_handler(key, foo_getter) - m = self.message() + m = self._make_message() m['Content-Type'] = 'text/plain' m['X-Bar-Header'] = 'foo' self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) + def get_key_as_get_content_key_order(self, order, key): + def bar_getter(msg): + return msg['X-Bar-Header'] + def foo_getter(msg): + return msg['X-Foo-Header'] + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_get_handler(key, bar_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'bar' + m['X-Foo-Header'] = 'foo' + self.assertEqual(cm.get_content(m), ('foo')) + def test_get_content_raises_if_unknown_mimetype_and_no_default(self): cm = ContentManager() - m = self.message() + m = self._make_message() m['Content-Type'] = 'text/plain' with self.assertRaisesRegex(KeyError, 'text/plain'): cm.get_content(m) - class BaseMessage(str): + class BaseThing(str): pass - baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseMessage' - class Message(BaseMessage): + baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' + class Thing(BaseThing): pass - testobject_full_path = __name__ + '.' + 'TestContentManager.Message' + testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' set_key_params = { - 'type': (Message,), - 'full_path': (testobject_full_path,), - 'qualname': ('TestContentManager.Message',), - 'name': ('Message',), - 'base_type': (BaseMessage,), - 'base_full_path': (baseobject_full_path,), - 'base_qualname': ('TestContentManager.BaseMessage',), - 'base_name': ('BaseMessage',), - 'str_type': (str,), - 'str_full_path': ('builtins.str',), - 'str_name': ('str',), # str name and qualname are the same - 'null_key': (None,), + 'type': (0, Thing,), + 'full_path': (1, testobject_full_path,), + 'qualname': (2, 'TestContentManager.Thing',), + 'name': (3, 'Thing',), + 'base_type': (4, BaseThing,), + 'base_full_path': (5, baseobject_full_path,), + 'base_qualname': (6, 'TestContentManager.BaseThing',), + 'base_name': (7, 'BaseThing',), + 'str_type': (8, str,), + 'str_full_path': (9, 'builtins.str',), + 'str_name': (10, 'str',), # str name and qualname are the same + 'null_key': (11, None,), } - def set_key_as_set_content_key(self, key): + def set_key_as_set_content_key(self, order, key): def foo_setter(msg, obj, foo=None): msg['X-Foo-Header'] = foo msg.set_payload(obj) cm = ContentManager() cm.add_set_handler(key, foo_setter) - m = self.message() - msg_obj = self.Message() + m = self._make_message() + msg_obj = self.Thing() cm.set_content(m, msg_obj, foo='bar') self.assertEqual(m['X-Foo-Header'], 'bar') self.assertEqual(m.get_payload(), msg_obj) + def set_key_as_set_content_key_order(self, order, key): + def foo_setter(msg, obj): + msg['X-FooBar-Header'] = 'foo' + msg.set_payload(obj) + def bar_setter(msg, obj): + msg['X-FooBar-Header'] = 'bar' + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_set_handler(key, bar_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj) + self.assertEqual(m['X-FooBar-Header'], 'foo') + self.assertEqual(m.get_payload(), msg_obj) + def test_set_content_raises_if_unknown_type_and_no_default(self): cm = ContentManager() - m = self.message() - msg_obj = self.Message() + m = self._make_message() + msg_obj = self.Thing() with self.assertRaisesRegex(KeyError, self.testobject_full_path): cm.set_content(m, msg_obj) +@parameterize +class TestRawDataManager(TestEmailBase): + + policy = policy.default.clone(max_line_length=60, + content_manager=raw_data_manager) + message = MIMEMessage + + def test_get_text_plain(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") + + def test_get_text_html(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/html + +

Basic text.

+ """)) + self.assertEqual(raw_data_manager.get_content(m), "

Basic text.

\n") + + def test_get_text_plain_latin1(self): + m = self._bytes_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset=latin1 + + Basìc tëxt. + """).encode('latin1')) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_latin1_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: quoted-printable + + Bas=ECc t=EBxt. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo= + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_bad_utf8_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") + + def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo\xFF= + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, foo='ignore') + + def test_get_non_text(self): + template = textwrap.dedent("""\ + Content-Type: {} + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """) + for maintype in 'audio image video application'.split(): + with self.subTest(maintype=maintype): + m = self._str_msg(template.format(maintype+'/foo')) + self.assertEqual(raw_data_manager.get_content(m), b"bogus data") + + def test_get_non_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: image/jpg + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, errors='ignore') + + def test_get_raises_on_multipart(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="===" + + --=== + --===-- + """)) + with self.assertRaises(KeyError): + raw_data_manager.get_content(m) + + def test_get_message_rfc822_and_external_body(self): + template = textwrap.dedent("""\ + Content-Type: message/{} + + To: foo@example.com + From: bar@example.com + Subject: example + + an example message + """) + for subtype in 'rfc822 external-body'.split(): + with self.subTest(subtype=subtype): + m = self._str_msg(template.format(subtype)) + sub_msg = raw_data_manager.get_content(m) + self.assertIsInstance(sub_msg, self.message) + self.assertEqual(raw_data_manager.get_content(sub_msg), + "an example message\n") + self.assertEqual(sub_msg['to'], 'foo@example.com') + self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') + + def test_get_message_non_rfc822_or_external_body_yields_bytes(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: message/partial + + To: foo@example.com + From: bar@example.com + Subject: example + + The real body is in another message. + """)) + self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') + + def test_set_text_plain(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_html(self): + m = self._make_message() + content = "

Simple message.

\n" + raw_data_manager.set_content(m, content, subtype='html') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/html; charset="utf-8" + Content-Transfer-Encoding: 7bit + +

Simple message.

+ """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_charset_latin_1(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, charset='latin-1') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_short_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = "et la il est monté sur moi et il commence a m'étouffer.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + et la il est monté sur moi et il commence a m'étouffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = ("j'ai un problème de python. il est sorti de son" + " vivarium. et la il est monté sur moi et il commence" + " a m'étouffer.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et la il est mont=C3=A9 sur moi et il commence a m'=C3= + =A9touffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + ( + "j'ai un problème de python. il est sorti de son" + " vivarium. et la il est monté sur moi et il commence" + " a m'étouffer.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et la il est mont=C3=A9 sur moi et il commence a m'=C3= + =A9touffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + """ + '\n'*10 + """ + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD + tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo + xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD + qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg + w6TDqcOoxJnDtsWRLgo= + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): + # Yes, it chooses "wrong" here. It's a heuristic. So this result + # could change if we come up with a better heuristic. + m = self._make_message() + content = ('\n'*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, "\n"*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= + =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= + =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= + =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= + =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= + =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= + =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= + =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= + =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= + =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= + =C5=91. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_non_ascii_with_cte_7bit_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') + + def test_set_text_non_ascii_with_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') + + def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') + + def test_set_message(self): + m = self._make_message() + m['Subject'] = "Forwarded message" + content = self._make_message() + content['To'] = 'python@vivarium.org' + content['From'] = 'police@monty.org' + content['Subject'] = "get back in your box" + content.set_content("Or face the comfy chair.") + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Subject: Forwarded message + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: python@vivarium.org + From: police@monty.org + Subject: get back in your box + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Or face the comfy chair. + """)) + payload = m.get_payload(0) + self.assertIsInstance(payload, self.message) + self.assertEqual(str(payload), str(content)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_with_non_ascii_and_coersion_to_7bit(self): + m = self._make_message() + m['Subject'] = "Escape report" + content = self._make_message() + content['To'] = 'police@monty.org' + content['From'] = 'victim@monty.org' + content['Subject'] = "Help" + content.set_content("j'ai un problème de python. il est sorti de son" + " vivarium.") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Subject: Escape report + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + j'ai un problème de python. il est sorti de son vivarium. + """).encode('utf-8')) + # The choice of base64 for the body encoding is because generator + # doesn't bother with heuristics and uses it unconditionally for utf-8 + # text. + # XXX: the first cte should be 7bit, too...that's a generator bug. + # XXX: the line length in the body also looks like a generator bug. + self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), + textwrap.dedent("""\ + Subject: Escape report + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt + Lgo= + """)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_invalid_cte_raises(self): + m = self._make_message() + content = self._make_message() + for cte in 'quoted-printable base64'.split(): + for subtype in 'rfc822 external-body'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + subtype = 'external-body' + for cte in '8bit binary'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + + def test_set_image_jpg(self): + for content in (b"bogus content", + bytearray(b"bogus content"), + memoryview(b"bogus content")): + with self.subTest(content=content): + m = self._make_message() + raw_data_manager.set_content(m, content, 'image', 'jpeg') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: image/jpeg + Content-Transfer-Encoding: base64 + + Ym9ndXMgY29udGVudA== + """)) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_audio_aif_with_quoted_printable_cte(self): + # Why you would use qp, I don't know, but it is technically supported. + # XXX: the incorrect line length is because binascii.b2a_qp doesn't + # support a line length parameter, but we must use it to get newline + # encoding. + # XXX: what about that lack of tailing newline? Do we actually handle + # that correctly in all cases? That is, if the *source* has an + # unencoded newline, do we add an extra newline to the returned payload + # or not? And can that actually be disambiguated based on the RFC? + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'audio', 'aif', cte='quoted-printable') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: audio/aif + Content-Transfer-Encoding: quoted-printable + + b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= + zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_video_mpeg_with_binary_cte(self): + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'video', 'mpeg', cte='binary') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: video/mpeg + Content-Transfer-Encoding: binary + + """).encode('ascii') + + # XXX: the second \n ought to be a \r, but generator gets it wrong. + # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. + b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_application_octet_stream_with_8bit_cte(self): + # In 8bit mode, univeral line end logic applies. It is up to the + # application to make sure the lines are short enough; we don't check. + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' + m.set_content(content, 'application', 'octet-stream', cte='8bit') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: application/octet-stream + Content-Transfer-Encoding: 8bit + + """).encode('ascii') + + b'b\xFFgus\tcon\nt\nent\n' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_headers_from_header_objects(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + raw_data_manager.set_content(m, content, headers=( + header_factory("To", "foo@example.com"), + header_factory("From", "foo@example.com"), + header_factory("Subject", "I'm talking to myself."))) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + To: foo@example.com + From: foo@example.com + Subject: I'm talking to myself. + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_from_strings(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, headers=( + "X-Foo-Header: foo", + "X-Bar-Header: bar",)) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + X-Foo-Header: foo + X-Bar-Header: bar + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_with_invalid_duplicate_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + "Content-Type: foo/bar",) + ) + + def test_set_headers_with_invalid_duplicate_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + header_factory("Content-Type", " foo/bar"),) + ) + + def test_set_headers_with_defective_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + 'To: a@fairly@@invalid@address',) + ) + print(m['To'].defects) + + def test_set_headers_with_defective_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + header_factory('To', 'a@fairly@@invalid@address'),) + ) + print(m['To'].defects) + + def test_set_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def test_set_disposition_attachment(self): + m = self._make_message() + m.set_content('foo', disposition='attachment') + self.assertEqual(m['Content-Disposition'], 'attachment') + + def test_set_disposition_foo(self): + m = self._make_message() + m.set_content('foo', disposition='foo') + self.assertEqual(m['Content-Disposition'], 'foo') + + # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that + # would cause 'foo' above to raise. + + def test_set_filename(self): + m = self._make_message() + m.set_content('foo', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'attachment; filename="bar.txt"') + + def test_set_filename_and_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') + + def test_set_non_ascii_filename(self): + m = self._make_message() + m.set_content('foo', filename='ábárî.txt') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + Content-Disposition: attachment; + filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt + + foo + """).encode('ascii')) + + content_object_params = { + 'text_plain': ('content', ()), + 'text_html': ('content', ('html',)), + 'application_octet_stream': (b'content', ('application', 'octet_stream')), + 'image_jpeg': (b'content', ('image', 'jpeg')), + 'message_rfc822': (message(), ()), + 'message_external_body': (message(), ('external-body',)), + } + + def content_object_as_header_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, headers=( + 'To: foo@example.com', + 'From: bar@simple.net')) + self.assertEqual(m['to'], 'foo@example.com') + self.assertEqual(m['from'], 'bar@simple.net') + + def content_object_as_disposition_inline_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') + self.assertEqual(m.get_filename(), "bár.txt") + self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") + + def content_object_as_cid_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, cid='some_random_stuff') + self.assertEqual(m['Content-ID'], 'some_random_stuff') + + def content_object_as_params_receiver(self, obj, mimetype): + m = self._make_message() + params = {'foo': 'bár', 'abc': 'xyz'} + m.set_content(obj, *mimetype, params=params) + if isinstance(obj, str): + params['charset'] = 'utf-8' + self.assertEqual(m['Content-Type'].params, params) + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py index f829f83..f2df372 100644 --- a/Lib/test/test_email/test_headerregistry.py +++ b/Lib/test/test_email/test_headerregistry.py @@ -661,7 +661,7 @@ class TestContentTypeHeader(TestHeaderBase): 'text/plain; name="ascii_is_the_default"'), 'rfc2231_bad_character_in_charset_parameter_value': ( - "text/plain; charset*=ascii''utf-8%E2%80%9D", + "text/plain; charset*=ascii''utf-8%F1%F2%F3", 'text/plain', 'text', 'plain', @@ -669,6 +669,18 @@ class TestContentTypeHeader(TestHeaderBase): [errors.UndecodableBytesDefect], 'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'), + 'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': ( + "text/plain; charset*=ascii''utf-8%E2%80%9D", + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8”'}, + [errors.UndecodableBytesDefect], + 'text/plain; charset="utf-8”"', + ), + # XXX: if the above were *re*folded, it would get tagged as utf-8 + # instead of ascii in the param, since it now contains non-ASCII. + 'rfc2231_encoded_then_unencoded_segments': ( ('application/x-foo;' '\tname*0*="us-ascii\'en-us\'My";' diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py index 2727695..00144ba 100644 --- a/Lib/test/test_email/test_message.py +++ b/Lib/test/test_email/test_message.py @@ -454,7 +454,7 @@ class TestMIMEMessage(TestEmailBase): self.assertEqual(part[name], value) self.assertEqual(part.get_payload(), payload) - def make_subtype_as_with_boundary(self, method, subtype, outcome): + def subtype_as_make_with_boundary(self, method, subtype, outcome): # Doing all variation is a bit of overkill... m = self.message() if outcome in ('', 'raises'):