diff --git a/Doc/library/email.contentmanager.rst b/Doc/library/email.contentmanager.rst new file mode 100644 --- /dev/null +++ b/Doc/library/email.contentmanager.rst @@ -0,0 +1,358 @@ +:mod:`email.contentmanager: Managing MIME Content +------------------------------------------------- + +.. module:: email.contentmanager + :synopsis: Storing and Retrieving Content from MIME Parts + +.. moduleauthor:: R. David Murray +.. sectionauthor:: R. David Murray + + +.. note:: + + The contentmanager module has been included in the standard library on a + :term:`provisional basis `. Backwards incompatible + changes (up to and including removal of the module) may occur if deemed + necessary by the core developers. + +.. versionadded:: 3.4 + as a :term:`provisional module `. + +The :mod:`~email.message` module provides a class that can represent an +arbitrary email message, regardless of whether it is a non-MIME format message, +or is in MIME format (has a MIME-Version header). That basic message model has +a useful and flexible API, but it only knows about the general structure of a +message. Actual MIME messages and message subparts can have additional +structure and semantics. This module provides classes and tools for handling +various specific types of content in a flexible and extensible fashion, +including the ability to retrieve the content of the message as a specialized +object type rather than as a simple bytes object. The module takes care of the +RFC-specified MIME details for the various common content types, and support +for additional types can be added by an application using the extension +mechanisms. + +To provide an API that makes content management simpler, we define a +subclass of :class:`~email.message.Message` named :class:`.MIMEMessage`. +Note: a ``MIMEMessage`` can still be used to represent a non-MIME, +text-body-only message during parsing, but that is a case that happens only +rarely in today's world. + +This module defines the eponymous "Content Manager" classes. The base +:class:`.ContentManager` class defines an API for registering content +management functions, creating mappings between MIME content types and other +representations (files or Python objects). Three subclasses of +:class:`.ContentManager` provide concrete implementations of the content +management protocol: :class:`RawDataManager` maps between content types and +``str`` or ``bytes`` data (and requires that you manage all of the MIME +parameters by hand), :class:`FileManager` uses the :mod:`mimetypes` module to +map between MIME content types and file objects, and :class:`ObjectManager` +maps between MIME content types and specific Python object types that represent +that content. This module also defines a few helper classes that can be used +when creating MIME parts from content when using the :class:`ObjectManager`. + +.. note:: + + Although :class:`.MIMEMEssage` is currently documented in this module + because of the provisional nature of the code, the implementation lives + in the :mod:`email.message` module. + + +.. class:: MIMEMessage(policy=default) + + The *policy* argument determines the :mod:`~email.policy` that will be used + to update the message model. The default value, :class:`default`, follows + the rules of the email RFCs except for line endings: instead of the RFC + mandated ``\r\n``, it uses the Python standard ``\n`` line endings. For + more information see the :mod:`~email.policy` documentation. + + This class is a subclass of :class:`~email.message.Message`. It adds + the following methods: + + + .. method:: get_body(preferencelist=('releated', 'html', 'plain')) + + Return the MIME part that contains the notional ``body`` of the message. + + *preferencelist* is a sequence of strings from the set ``related``, + ``html``, and ``plain``, and indicates the order of preference for the + content type of the part returned. If ``html`` is included in the list + and ``related`` is not, returns the ``html`` part of a ``related`` part + if one is found. If there is no part that matches at least one of the + types in the preference list, returns ``None``. Note that for most + applications the only combinations that really make sense are + ``('plain',)``, ``('html', 'plain')``, and the default, ``('related', + 'html', 'plain')``. + + If called on a non-``multipart`` message, returns the part on which it + was called if that part is of a type that matches one of the preferences, + otherwise it returns ``None``. Recall that if a part does not have + an explicit type it defaults to ``text/plain``. + + If a part has a :mailheader:``Content-Disposition`` header, it is + only considered a body candidate if the value is ``inline``. + + + .. method:: iter_attachments() + + Returns an iterator over all of the parts of the message that are not + candidate "body" parts. That is, the first occurrence of each of + ``text/plain``, ``text/html``, ``multipart/related, or + ``multipart/alternative`` are skipped, and all remaining parts are + returned, When applied directly to a ``multipart/related``, returns an + iterator over the all the related parts except an initial ``text`` part, + if there is one. When applied directly to a ``multipart/alternative`` or + a non-``multipart``, returns an empty list. + + + .. method:: iter_parts() + + Returns an iterator over all of the parts of the message, which will be + empty for a non-``multipart``. + + + .. method:: get_content(*args, content_manager=None, **kw) + + Calls the ``get_content`` method of the *content_manager*, passing itself + as the message object, and passing along any other arguments or keywords + as additional arguments. If *content_manager* is not specified, it + defaults to the ``content_manager`` specified by the current + :mod:`~email.policy`. + + + .. method:: set_content(*args, content_manager=None, **kw) + + Calls the ``set_content`` method of the *content_manager*, passing itself + as the message object, and passing along any other arguments or keywords + as additional arguments. If *content_manager* is not specified, it + defaults to the ``content_manager`` specified by the current + :mod:`~email.policy`. + + + .. method:: make_related(boundary=None) + + Convert a non-``multipart`` message into a ``multipart/related`` message, + moving any existing content into the (new) first part of the + ``mulitpart``. Headers starting from :mailheader:`Content-Type` to the + end of the header list are moved to the new sub-part, any earlier headers + are left in the base part. Optional *boundary* is the boundary string + for the newly created multipart. When ``None`` (the default), the + boundary is calculated when needed (for example, when the message is + serialized). + + + .. method:: make_alternative(boundary=None) + + Convert a non-``multipart`` or a ``multipart-related`` into a + ``multipart/alternative``, moving the existing content into the (new) + first part of the ``multipart``. Headers starting from + :mailheader:`Content-Type` to the end of the header list are moved to the + new sub-part, any earlier headers are left in the base part. Optional + *boundary* is the boundary string for the newly created multipart. When + ``None`` (the default), the boundary is calculated when needed (for + example, when the message is serialized). + + + .. method:: make_mixed(boundary=None) + + Convert a non-``multipart``, a ``multipart-related``, or a + ``multipart-alternative`` into a ``multipart/alternative``, moving the + existing content into the (new) first part of the ``multipart``. Headers + starting from :mailheader:`Content-Type` to the end of the header list + are moved to the new sub-part, any earlier headers are left in the base + part. Optional *boundary* is the boundary string for the newly created + multipart. When ``None`` (the default), the boundary is calculated when + needed (for example, when the message is serialized). + + + .. method:: add_related(*args, content_manager=None, **kw) + + If the message is a ``multipart/related``, create a new message + object, pass all of the arguments to its :meth:`set_content` method, + and :meth:`~email.message.Message.attach` it to the ``multipart``. If + the message is a non-``multipart``, call :meth:`make_related` and then + proceeds as above. If the message is any other type of ``multipart``, + raise a :exc:`TypeError`. If *content_manager* is not specified, it + defaults to the ``content_manager`` specified by the current + :mod:`~email.policy`. + + + .. method:: add_alternative(*args, content_manager=None, **kw) + + If the message is a ``multipart/alternative``, create a new message + object, pass all of the arguments to its :meth:`set_content` method, and + :meth:`~email.message.Message.attach` it to the ``multipart``. If the + message is a non-``multipart`` or ``multipart-related``, call + :meth:`make_alternative` and then proceeds as above. If the message is + any other type of ``multipart``, raise a :exc:`TypeError`. If + *content_manager* is not specified, it defaults to the + ``content_manager`` specified by the current :mod:`~email.policy`. + + + .. method:: add_attachment(*args, content_manager=None, **kw) + + If the message is a ``multipart/mixed``, create a new message object, + pass all of the arguments to its :meth:`set_content` method, and + :meth:`~email.message.Message.attach` it to the ``multipart``. If the + message is a non-``multipart``, ``multipart-related``, or + ``multipart/alternative``, call :meth:`make_mixed` and then proceeds as + above. If *content_manager* is not specified, it defaults to the + ``content_manager`` specified by the current :mod:`~email.policy`. + + +.. class:: ContentManager() + + Base class for content managers. Provides the standard registry mechanisms that + map MIME content types to other representations, as well as the ``get_content`` + and ``set_content`` dispatch methods. + + + .. method get_content(msg, *args, **kw) + + Look up a handler function based on the ``mimetype`` of *msg*, call it, + passing through all arguments, and return the result of the call. The + expectation is that the handler will extract the payload from *msg* and + return an object that encodes information about the extracted data. + If the full ``mimetype`` of the message is found in the registry, + call the associated handler. If not, but a handler is registered + for just the ``maintype``, call that handler. If there is no handler + for the ``maintype``, but there is a handler registered for the + empty string, call that handler. If there are no handlers for any + of these keys, raise a :exc:`KeyError` for the full ``mimetype``. + + + .. method set_content(msg, obj, *args, **kw) + + Look up a handler function based on the type of *obj* and call it, + passing through all arguments. The expectation is that the handler will + transform and store *obj* into *msg*, possibly making other changes to + *msg* as well, such as adding various MIME headers to encode information + needed to interpret the stored data. First use the object's type + (``type(obj)``) as a key to look for a handler in the registry, and call + it if found. If not, use the type's fully qualified name + (``type(obj).__module__ + '.' + type(obj).__qualname__``). If no handler + is found for that key, use the type's qualname + (``type(obj).__qualname__``). If no handler is found, use the type's + name (``type(obj).__name__``). If no handler is found, repeat these four + checks for each remaining type in the type's ``__mro__``. Finally, if no + other handler is found, but a hander exists under the key ``None``, call + that handler. Otherwise raise a :exc:``KeyError`` for the fully + qualified name of the type. + + + .. method add_get_handler(key, handler) + + Record the function *handler* as the handler for *key*. For the possible + values of *key*, see :meth:`get_content`. + + + .. method add_set_handler(typekey, handler) + + Record *handler* as the function to call when an object of a type + matching *typekey* is passed to :meth:`set_content`. For the possible + values of *typekey*, see :meth:`set_content`. + + +Content Manager Instances +------------------------- + + +.. data:: raw_data_manager + + This content manager provides only a minimum interface beyond that provided + by :class:`~email.message.Message` itself: it deals only with text, raw + byte strings, and :class:`~email.message.Message` objects. Nevertheless, it + provides significant advantages compared to the base API: ``get_content`` on + a text part will return a unicode string without the application needing to + manually decode it, ``set_content`` provides a rich set of options for + controlling the headers added to a part and controlling the content transfer + encoding, and it enables the use of the various ``add_`` methods, thereby + simplifying the creation of multipart messages. + + .. method:: get_content(msg, errors='replace') + + Return the payload of the part as either a string (for ``text`` parts), a + :class:`~email.message.MIMEMessage` object (for ``message/rfc822`` + parts), or a ``bytes`` object (for all other non-multipart types). Raise + a :exc:`KeyError` if called on a ``multipart``. If the part is a + ``text`` part and *errors* is specified, use it as the error handler when + decoding the payload to unicode. The default error handler is + ``replace``. + + .. method:: set_content(msg, <'str'>, subtype="plain", charset='utf-8' + cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'bytes'>, maintype, subtype, cte="base64", + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'Message'>, cte=None, + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + set_content(msg, <'list'>, subtype='mixed', + dispoosition=None, filename=None, cid=None, + params=None, headers=None) + + Add headers and payload to *msg*: + + If the *msg* object doesn not have a :mailheader:`MIME-Version` + header, add one. + + Add a :mailheader:`Content-Type` header with a ``maintype/subtype`` + value. For strings, set the MIME ``maintype`` to text, and set the + subtype to *subtype* if it is specified, or ``plain`` if it is not. For + :class:`~email.message.Message` objects, set the maintype to + ``message``, and set the subtype to *subtype* if it is specified or + ``rfc822`` if it is not. If *subtype* is ``partial``, raise an error + (use ``bytes`` objects to construct ``message/partial`` parts). For + ``bytes``, use the specified *maintype* and *subtype*, or raise a + :exc:`TypeError` if they are not specified. For *<'list'>*, which + should be a list of :class:`~email.message.Message` objects, set the + ``maintype`` to ``multipart``, and the ``subtype`` to *subtype* if it is + specified, and ``mixed`` if it is not. If the message parts in the + *<'list'>* have :mailheader:`MIME-Version` headers, remove them. + + If *charset* is provided (which is valid only for ``str``), encode the + string to bytes using the specified character set. The default is + ``utf-8``. If the specified *charset* is a known alias for a standard + MIME charset name, use the standard charset instead. + + If *cte* is set, encode the payload using the specified content transfer + encoding, and set the :mailheader:`Content-Transfer-Endcoding` header to + that value. For ``str`` objects, if it is not set use heuristics to + determine the most compact encoding. Possible values for *cte* are + ``quoted-printable``, ``base64``, ``7bit``, ``8bit``, and ``binary``. + If the input cannot be encoded in the specified encoding (eg: ``7bit``), + raise a :exc:`ValueError`. For :class:`~email.message.Message`, per + :rfc:2046, raise an error if a *cte* of ``quoted-printable`` or + ``base64`` is requested for *subtype* ``rfc822``, and for any *cte* + other than ``7bit`` for *subtype* ``external-body``. For + ``message/rfc822``, use ``8bit`` if *cte* is not specified. For all + other values of *subtype*, use ``7bit``. + + .. note:: A *cte* of ``binary`` does not actually work correctly yet. + The ``Message`` object as modified by ``set_content`` is correct, but + :class:`~email.generator.BytesGenerator` does not serialize it + correctly. + + If *disposition* is set, use it as the value of the + :mailheader:`Content-Disposition` header. If not specified, and + *filename* is specified, add the header with the value ``attachment``. + If it is not specified and *filename* is also not specified, do not add + the header. The only valie values for *disposition* are ``attachment`` + and ``inline``. + + If *filename* is specified, use it as the value of the ``filename`` + parameter of the :mailheader:`Content-Disposition` header. There is no + default. + + If *cid* is specified, add a :mailheader:`Content-ID` header with + *cid* as its value. + + If *params* is specified, iterate its ``items`` method and use the + resulting ``(key, value)`` pairs to set additional paramters on the + :mailheader:`Content-Type` header. + + If *headers* is specified and is a list of strings of the form + ``headername: headervalue`` or a list of ``header`` objects + (distinguised from strings by having a ``name`` attribute), add the + headers to *msg*. diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst --- a/Doc/library/email.message.rst +++ b/Doc/library/email.message.rst @@ -465,7 +465,8 @@ to ``False``. - .. method:: set_param(param, value, header='Content-Type', requote=True, charset=None, language='') + .. method:: set_param(param, value, header='Content-Type', requote=True, + charset=None, language='', replace=False) Set a parameter in the :mailheader:`Content-Type` header. If the parameter already exists in the header, its value will be replaced with @@ -482,6 +483,10 @@ language, defaulting to the empty string. Both *charset* and *language* should be strings. + If *replace* is ``False`` (the default) the header is moved to the + end of the list of headers. If *replace* is ``True``, the header + will be updated in place. + .. method:: del_param(param, header='content-type', requote=True) diff --git a/Doc/library/email.policy.rst b/Doc/library/email.policy.rst --- a/Doc/library/email.policy.rst +++ b/Doc/library/email.policy.rst @@ -408,6 +408,18 @@ fields are treated as unstructured. This list will be completed before the extension is marked stable.) + .. attribute:: content_manager + + An object with at least two methods: get_content and set_content. When + the :meth:`~email.message.Message.get_content` or + :meth:`~email.message.Message.set_content` method of a + :class:`~email.message.Message` object is called, it calls the + corresponding method of this object, passing it the message object as its + first argument, and any arguments or keywords that were passed to it as + additional arguments. By default ``content_manager`` is set to an + instance of the :class:`~email.contentmanager.ObjectManager`` class. + + The class provides the following concrete implementations of the abstract methods of :class:`Policy`: diff --git a/Lib/email/contentmanager.py b/Lib/email/contentmanager.py new file mode 100644 --- /dev/null +++ b/Lib/email/contentmanager.py @@ -0,0 +1,245 @@ +import binascii +import email.charset +import email.message +import email.errors +from email import quoprimime + +class ContentManager: + + def __init__(self): + self.get_handlers = {} + self.set_handlers = {} + + def add_get_handler(self, key, handler): + self.get_handlers[key] = handler + + def get_content(self, msg, *args, **kw): + content_type = msg.get_content_type() + if content_type in self.get_handlers: + return self.get_handlers[content_type](msg, *args, **kw) + maintype = msg.get_content_maintype() + if maintype in self.get_handlers: + return self.get_handlers[maintype](msg, *args, **kw) + if '' in self.get_handlers: + return self.get_handlers[''](msg, *args, **kw) + raise KeyError(content_type) + + def add_set_handler(self, typekey, handler): + self.set_handlers[typekey] = handler + + def set_content(self, msg, obj, *args, **kw): + full_path_for_error = None + for typ in type(obj).__mro__: + if typ in self.set_handlers: + return self.set_handlers[typ](msg, obj, *args, **kw) + qname = typ.__qualname__ + modname = getattr(typ, '__module__', '') + full_path = '.'.join((modname, qname)) if modname else qname + if full_path_for_error is None: + full_path_for_error = full_path + if full_path in self.set_handlers: + return self.set_handlers[full_path](msg, obj, *args, **kw) + if qname in self.set_handlers: + return self.set_handlers[qname](msg, obj, *args, **kw) + name = typ.__name__ + if name in self.set_handlers: + return self.set_handlers[name](msg, obj, *args, **kw) + if None in self.set_handlers: + return self.set_handlers[None](msg, obj, *args, **kw) + raise KeyError(full_path_for_error) + + +raw_data_manager = ContentManager() + + +def get_text_content(msg, errors='replace'): + content = msg.get_payload(decode=True) + charset = msg.get_param('charset', 'ASCII') + return content.decode(charset, errors=errors) +raw_data_manager.add_get_handler('text', get_text_content) + + +def get_non_text_content(msg): + return msg.get_payload(decode=True) +for maintype in 'audio image video application'.split(): + raw_data_manager.add_get_handler(maintype, get_non_text_content) + + +def get_message_content(msg): + return msg.get_payload(0) +for subtype in 'rfc822 external-body'.split(): + raw_data_manager.add_get_handler('message/'+subtype, get_message_content) + + +def get_and_fixup_unknown_message_content(msg): + # If we don't understand a message subtype, we are supposed to treat it as + # if it were application/octet-stream, per + # tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that, + # so do our best to fix things up. Note that it is *not* appropriate to + # model message/partial content as Message objects, so they are handled + # here as well. (How to reassemble them is out of scope for this comment :) + return bytes(msg.get_payload(0)) +raw_data_manager.add_get_handler('message', + get_and_fixup_unknown_message_content) + + +def _prepare_set(msg, maintype, subtype, headers): + if 'mime-version' not in msg: + msg['MIME-Version'] = '1.0' + msg['Content-Type'] = '/'.join((maintype, subtype)) + if headers: + if not hasattr(headers[0], 'name'): + mp = msg.policy + headers = [mp.header_factory(*mp.header_source_parse([header])) + for header in headers] + try: + for header in headers: + if header.defects: + raise header.defects[0] + msg[header.name] = header + except email.errors.HeaderDefect as exc: + raise ValueError("Invalid header: {}".format( + header.fold(policy=msg.policy))) from exc + + +def _finalize_set(msg, disposition, filename, cid, params, headers): + if disposition is None and filename is not None: + disposition = 'attachment' + if disposition is not None: + msg['Content-Disposition'] = disposition + if filename is not None: + msg.set_param('filename', + filename, + header='Content-Disposition', + replace=True) + if cid is not None: + msg['Content-ID'] = cid + if params is not None: + for key, value in params.items(): + msg.set_param(key, value) + + +# XXX: This is a cleaned-up version of base64mime.body_encode. It would +# be nice to drop both this and quoprimime.body_encode in favor of +# enhanced binascii routines that accepted a max_line_length parameter. +def _encode_base64(data, max_line_length): + encoded_lines = [] + unencoded_bytes_per_line = max_line_length * 3 // 4 + for i in range(0, len(data), unencoded_bytes_per_line): + thisline = data[i:i+unencoded_bytes_per_line] + encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii')) + return ''.join(encoded_lines) + + +def _encode_text(string, charset, cte, policy): + lines = string.encode(charset).splitlines() + linesep = policy.linesep.encode('ascii') + def embeded_body(lines): return linesep.join(lines) + linesep + def normal_body(lines): return b'\n'.join(lines) + b'\n' + if cte==None: + # Use heuristics to decide on the "best" encoding. + try: + return '7bit', normal_body(lines).decode('ascii') + except UnicodeDecodeError: + pass + if (policy.cte_type == '8bit' and + max((len(x) for x in lines)) <= policy.max_line_length): + return '8bit', normal_body(lines).decode('ascii', 'surrogateescape') + sniff = embeded_body(lines[:10]) + sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'), + policy.max_line_length) + sniff_base64 = binascii.b2a_base64(sniff) + # This is a little unfair to qp; it includes lineseps and base64 doesn't. + if len(sniff_qp) > len(sniff_base64): + cte = 'base64' + else: + cte = 'quoted-printable' + if len(lines) <= 10: + return cte, sniff_qp + if cte == '7bit': + data = normal_body(lines).decode('ascii') + elif cte == '8bit': + data = normal_body(lines).decode('ascii', 'surrogateescape') + elif cte == 'quoted-printable': + data = quoprimime.body_encode(normal_body(lines).decode('latin-1'), + policy.max_line_length) + elif cte == 'base64': + data = _encode_base64(embeded_body(lines), policy.max_line_length) + else: + raise ValueError("Unknown content transfer encoding {}".format(cte)) + return cte, data + + +def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, 'text', subtype, headers) + cte, payload = _encode_text(string, charset, cte, msg.policy) + msg.set_payload(payload) + msg.set_param('charset', + email.charset.ALIASES.get(charset, charset), + replace=True) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(str, set_text_content) + + +def set_message_content(msg, message, subtype="rfc822", cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + if subtype == 'partial': + raise ValueError("message/partial is not supported for Message objects") + if subtype == 'rfc822': + if cte not in (None, '7bit', '8bit', 'binary'): + # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate. + raise ValueError( + "message/rfc822 parts do not support cte={}".format(cte)) + # 8bit will get coerced on serialization if policy.cte_type='7bit'. We + # may end up claiming 8bit when it isn't needed, but the only negative + # result of that should be a gateway that needs to coerce to 7bit + # having to look through the whole embedded message to discover whether + # or not it actually has to do anything. + cte = '8bit' if cte is None else cte + elif subtype == 'external-body': + if cte not in (None, '7bit'): + # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate. + raise ValueError( + "message/external-body parts do not support cte={}".format(cte)) + cte = '7bit' + elif cte is None: + # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future + # subtypes should be restricted to 7bit, so assume that. + cte = '7bit' + _prepare_set(msg, 'message', subtype, headers) + msg.set_payload([message]) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(email.message.Message, set_message_content) + + +def set_bytes_content(msg, data, maintype, subtype, cte='base64', + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, maintype, subtype, headers) + if cte == 'base64': + data = _encode_base64(data, max_line_length=msg.policy.max_line_length) + elif cte == 'quoted-printable': + # XXX: quoprimime.body_encode won't encode newline characters in data, + # so we can't use it. This means max_line_length is ignored. Another + # bug to fix later. (Note: encoders.quopri is broken on line ends. + data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True) + data = data.decode('ascii') + elif cte == '7bit': + # Make sure it really is only ASCII. The early warning here seems + # worth the overhead...if you care write your own content manager :). + data.encode('ascii') + elif cte in ('8bit', 'binary'): + data = data.decode('ascii', 'surrogateescape') + msg.set_payload(data) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +for typ in (bytes, bytearray, memoryview): + raw_data_manager.add_set_handler(typ, set_bytes_content) + + +object_manager = ContentManager() diff --git a/Lib/email/message.py b/Lib/email/message.py --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -8,8 +8,6 @@ import re import uu -import base64 -import binascii from io import BytesIO, StringIO # Intrapackage imports @@ -679,7 +677,7 @@ return failobj def set_param(self, param, value, header='Content-Type', requote=True, - charset=None, language=''): + charset=None, language='', replace=False): """Set a parameter in the Content-Type header. If the parameter already exists in the header, its value will be @@ -723,8 +721,11 @@ else: ctype = SEMISPACE.join([ctype, append_param]) if ctype != self.get(header): - del self[header] - self[header] = ctype + if replace: + self.replace_header(header, ctype) + else: + del self[header] + self[header] = ctype def del_param(self, param, header='content-type', requote=True): """Remove the given parameter completely from the Content-Type header. @@ -905,3 +906,117 @@ # I.e. def walk(self): ... from email.iterators import walk + + +class MIMEMessage(Message): + + def __init__(self, policy=None): + if policy is None: + from email.policy import default + policy = default + Message.__init__(self, policy) + + def get_body(self, preferencelist=('related', 'html', 'plain')): + found = [None] * len(preferencelist) + skip_next = False + for part in self.walk(): + maintype, subtype = part.get_content_type().split('/') + if (skip_next or subtype not in preferencelist or + not (maintype == 'text' and + subtype in ('html', 'plain')) and + not (maintype == 'multipart' and subtype == 'related') or + part.get('content-disposition') not in (None, 'inline')): + skip_next = False + if maintype == 'message': + # Walk returns the message in the message part as if the + # message part were a multipart, even though it isn't. + skip_next = True + continue + priority = preferencelist.index(subtype) + if priority == 0: + # Short circuit, don't need to check the rest. + return part + found[priority] = part + # If #18652 is added, use this instead: + #return first_true(found, pred=lambda x: x is not None) + return next(filter(lambda x: x is not None, found), None) + + def iter_attachments(self): + seen = [] + maintype, subtype = self.get_content_type().split('/') + if maintype != 'multipart' or subtype == 'alternative': + return + for part in self.get_payload(): + maintype, subtype = part.get_content_type().split('/') + if ((maintype == 'text' and subtype in ('html', 'plain') or + maintype == 'multipart' and + subtype in ('related', 'alternative')) and + part.get('content-disposition') in (None, 'inline') and + subtype not in seen): + seen.append(subtype) + continue + yield part + + def iter_parts(self): + if self.get_content_maintype() == 'multipart': + yield from self.get_payload() + + def get_content(self, *args, content_manager=None, **kw): + if content_manager is None: + content_manager = self.policy.content_manager + return content_manager.get_content(self, *args, **kw) + + def set_content(self, *args, content_manager=None, **kw): + if content_manager is None: + content_manager = self.policy.content_manager + content_manager.set_content(self, *args, **kw) + + def _make_multipart(self, subtype, disallowed_subtypes, boundary): + if self.get_content_maintype() == 'multipart': + existing_subtype = self.get_content_subtype() + disallowed_subtypes = disallowed_subtypes + (subtype,) + if existing_subtype in disallowed_subtypes: + raise ValueError("Cannot convert {} to {}".format( + existing_subtype, subtype)) + part = type(self)(policy=self.policy) + headers = iter(self.items()) + for i, (name, value) in enumerate(headers): + if name.lower() == 'content-type': + part[name] = value + break + else: + i = -1 + for name, value in headers: + part[name] = value + self._headers[i:] = [] + part._payload = self._payload + self._payload = [part] + self['Content-Type'] = 'multipart/' + subtype + if boundary is not None: + self.set_param('boundary', boundary) + + def make_related(self, boundary=None): + self._make_multipart('related', ('alternative', 'mixed'), boundary) + + def make_alternative(self, boundary=None): + self._make_multipart('alternative', ('mixed',), boundary) + + def make_mixed(self, boundary=None): + self._make_multipart('mixed', (), boundary) + + def _add_multipart(self, subtype, *args, **kw): + if (self.get_content_maintype() != 'multipart' or + self.get_content_subtype() != subtype): + getattr(self, 'make_' + subtype)() + part = type(self)(policy=self.policy) + part.set_content(*args, **kw) + self.attach(part) + + def add_related(self, *args, **kw): + self._add_multipart('related', *args, **kw) + + def add_alternative(self, *args, **kw): + self._add_multipart('alternative', *args, **kw) + + def add_attachment(self, *args, **kw): + self._add_multipart('mixed', *args, **kw) diff --git a/Lib/email/policy.py b/Lib/email/policy.py --- a/Lib/email/policy.py +++ b/Lib/email/policy.py @@ -5,6 +5,7 @@ from email._policybase import Policy, Compat32, compat32, _extend_docstrings from email.utils import _has_surrogates from email.headerregistry import HeaderRegistry as HeaderRegistry +from email.contentmanager import raw_data_manager __all__ = [ 'Compat32', @@ -58,10 +59,22 @@ special treatment, while all other fields are treated as unstructured. This list will be completed before the extension is marked stable.) + + content_manager -- an object with at least two methods: get_content + and set_content. When the get_content or + set_content method of a Message object is called, + it calls the corresponding method of this object, + passing it the message object as its first argument, + and any arguments or keywords that were passed to + it as additional arguments. The default + content_manager is + :data:`~email.contentmanager.raw_data_manager`. + """ refold_source = 'long' header_factory = HeaderRegistry() + content_manager = raw_data_manager def __init__(self, **kw): # Ensure that each new instance gets a unique header factory diff --git a/Lib/email/utils.py b/Lib/email/utils.py --- a/Lib/email/utils.py +++ b/Lib/email/utils.py @@ -68,9 +68,13 @@ # How to deal with a string containing bytes before handing it to the # application through the 'normal' interface. def _sanitize(string): - # Turn any escaped bytes into unicode 'unknown' char. - original_bytes = string.encode('ascii', 'surrogateescape') - return original_bytes.decode('ascii', 'replace') + # Turn any escaped bytes into unicode 'unknown' char. If the escaped + # bytes happen to be utf-8 they will instead get decoded, even if they + # were invalid in the charset the source was supposed to be in. This + # seems like it is not a bad thing; a defect was still registered. + original_bytes = string.encode('utf-8', 'surrogateescape') + return original_bytes.decode('utf-8', 'replace') + # Helpers diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -2,6 +2,7 @@ import sys import unittest import test.support +import collections import email from email.message import Message from email._policybase import compat32 @@ -42,6 +43,8 @@ # here we make minimal changes in the test_email tests compared to their # pre-3.3 state. policy = compat32 + # Likewise, the default message object is Message. + message = Message def __init__(self, *args, **kw): super().__init__(*args, **kw) @@ -54,11 +57,23 @@ with openfile(filename) as fp: return email.message_from_file(fp, policy=self.policy) - def _str_msg(self, string, message=Message, policy=None): + def _str_msg(self, string, message=None, policy=None): if policy is None: policy = self.policy + if message is None: + message = self.message return email.message_from_string(string, message, policy=policy) + def _bytes_msg(self, bytestring, message=None, policy=None): + if policy is None: + policy = self.policy + if message is None: + message = self.message + return email.message_from_bytes(bytestring, message, policy=policy) + + def _make_message(self): + return self.message(policy=self.policy) + def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] @@ -123,6 +138,7 @@ """ paramdicts = {} + testers = collections.defaultdict(list) for name, attr in cls.__dict__.items(): if name.endswith('_params'): if not hasattr(attr, 'keys'): @@ -134,9 +150,17 @@ d[n] = x attr = d paramdicts[name[:-7] + '_as_'] = attr + if '_as_' in name: + testers[name.split('_as_')[0] + '_as_'].append(name) testfuncs = {} + for name in paramdicts: + if name not in testers: + raise ValueError("No tester found for {}".format(name)) + for name in testers: + if name not in paramdicts: + raise ValueError("No params found for {}".format(name)) for name, attr in cls.__dict__.items(): - for paramsname, paramsdict in paramdicts.items(): + for paramsname, paramsdict in list(paramdicts.items()): if name.startswith(paramsname): testnameroot = 'test_' + name[len(paramsname):] for paramname, params in paramsdict.items(): diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py new file mode 100644 --- /dev/null +++ b/Lib/test/test_email/test_contentmanager.py @@ -0,0 +1,782 @@ +import unittest +from test.test_email import TestEmailBase, parameterize +import textwrap +from email import policy +from email.message import MIMEMessage +from email.contentmanager import ContentManager, raw_data_manager + + +@parameterize +class TestContentManager(TestEmailBase): + + policy = policy.default + message = MIMEMessage + + get_key_params = { + 'full_type': (1, 'text/plain',), + 'maintype_only': (2, 'text',), + 'null_key': (3, '',), + } + + def get_key_as_get_content_key(self, order, key): + def foo_getter(msg, foo=None): + bar = msg['X-Bar-Header'] + return foo, bar + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'foo' + self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) + + def get_key_as_get_content_key_order(self, order, key): + def bar_getter(msg): + return msg['X-Bar-Header'] + def foo_getter(msg): + return msg['X-Foo-Header'] + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_get_handler(key, bar_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'bar' + m['X-Foo-Header'] = 'foo' + self.assertEqual(cm.get_content(m), ('foo')) + + def test_get_content_raises_if_unknown_mimetype_and_no_default(self): + cm = ContentManager() + m = self._make_message() + m['Content-Type'] = 'text/plain' + with self.assertRaisesRegex(KeyError, 'text/plain'): + cm.get_content(m) + + class BaseThing(str): + pass + baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' + class Thing(BaseThing): + pass + testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' + + set_key_params = { + 'type': (0, Thing,), + 'full_path': (1, testobject_full_path,), + 'qualname': (2, 'TestContentManager.Thing',), + 'name': (3, 'Thing',), + 'base_type': (4, BaseThing,), + 'base_full_path': (5, baseobject_full_path,), + 'base_qualname': (6, 'TestContentManager.BaseThing',), + 'base_name': (7, 'BaseThing',), + 'str_type': (8, str,), + 'str_full_path': (9, 'builtins.str',), + 'str_name': (10, 'str',), # str name and qualname are the same + 'null_key': (11, None,), + } + + def set_key_as_set_content_key(self, order, key): + def foo_setter(msg, obj, foo=None): + msg['X-Foo-Header'] = foo + msg.set_payload(obj) + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj, foo='bar') + self.assertEqual(m['X-Foo-Header'], 'bar') + self.assertEqual(m.get_payload(), msg_obj) + + def set_key_as_set_content_key_order(self, order, key): + def foo_setter(msg, obj): + msg['X-FooBar-Header'] = 'foo' + msg.set_payload(obj) + def bar_setter(msg, obj): + msg['X-FooBar-Header'] = 'bar' + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_set_handler(key, bar_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj) + self.assertEqual(m['X-FooBar-Header'], 'foo') + self.assertEqual(m.get_payload(), msg_obj) + + def test_set_content_raises_if_unknown_type_and_no_default(self): + cm = ContentManager() + m = self._make_message() + msg_obj = self.Thing() + with self.assertRaisesRegex(KeyError, self.testobject_full_path): + cm.set_content(m, msg_obj) + + +@parameterize +class TestRawDataManager(TestEmailBase): + + policy = policy.default.clone(max_line_length=60, + content_manager=raw_data_manager) + message = MIMEMessage + + def test_get_text_plain(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") + + def test_get_text_html(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/html + +

Basic text.

+ """)) + self.assertEqual(raw_data_manager.get_content(m), "

Basic text.

\n") + + def test_get_text_plain_latin1(self): + m = self._bytes_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset=latin1 + + Basìc tëxt. + """).encode('latin1')) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_latin1_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: quoted-printable + + Bas=ECc t=EBxt. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo= + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_bad_utf8_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") + + def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo\xFF= + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, foo='ignore') + + def test_get_non_text(self): + template = textwrap.dedent("""\ + Content-Type: {} + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """) + for maintype in 'audio image video application'.split(): + with self.subTest(maintype=maintype): + m = self._str_msg(template.format(maintype+'/foo')) + self.assertEqual(raw_data_manager.get_content(m), b"bogus data") + + def test_get_non_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: image/jpg + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, errors='ignore') + + def test_get_raises_on_multipart(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="===" + + --=== + --===-- + """)) + with self.assertRaises(KeyError): + raw_data_manager.get_content(m) + + def test_get_message_rfc822_and_external_body(self): + template = textwrap.dedent("""\ + Content-Type: message/{} + + To: foo@example.com + From: bar@example.com + Subject: example + + an example message + """) + for subtype in 'rfc822 external-body'.split(): + with self.subTest(subtype=subtype): + m = self._str_msg(template.format(subtype)) + sub_msg = raw_data_manager.get_content(m) + self.assertIsInstance(sub_msg, self.message) + self.assertEqual(raw_data_manager.get_content(sub_msg), + "an example message\n") + self.assertEqual(sub_msg['to'], 'foo@example.com') + self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') + + def test_get_message_non_rfc822_or_external_body_yields_bytes(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: message/partial + + To: foo@example.com + From: bar@example.com + Subject: example + + The real body is in another message. + """)) + self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') + + def test_set_text_plain(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_html(self): + m = self._make_message() + content = "

Simple message.

\n" + raw_data_manager.set_content(m, content, subtype='html') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/html; charset="utf-8" + Content-Transfer-Encoding: 7bit + +

Simple message.

+ """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_charset_latin_1(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, charset='latin-1') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_short_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = "et la il est monté sur moi et il commence a m'étouffer.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + et la il est monté sur moi et il commence a m'étouffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = ("j'ai un problème de python. il est sorti de son" + " vivarium. et la il est monté sur moi et il commence" + " a m'étouffer.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et la il est mont=C3=A9 sur moi et il commence a m'=C3= + =A9touffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + ( + "j'ai un problème de python. il est sorti de son" + " vivarium. et la il est monté sur moi et il commence" + " a m'étouffer.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et la il est mont=C3=A9 sur moi et il commence a m'=C3= + =A9touffer. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + """ + '\n'*10 + """ + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD + tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo + xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD + qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg + w6TDqcOoxJnDtsWRLgo= + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): + # Yes, it chooses "wrong" here. It's a heuristic. So this result + # could change if we come up with a better heuristic. + m = self._make_message() + content = ('\n'*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, "\n"*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= + =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= + =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= + =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= + =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= + =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= + =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= + =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= + =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= + =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= + =C5=91. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_non_ascii_with_cte_7bit_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') + + def test_set_text_non_ascii_with_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') + + def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') + + def test_set_message(self): + m = self._make_message() + m['Subject'] = "Forwarded message" + content = self._make_message() + content['To'] = 'python@vivarium.org' + content['From'] = 'police@monty.org' + content['Subject'] = "get back in your box" + content.set_content("Or face the comfy chair.") + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Subject: Forwarded message + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: python@vivarium.org + From: police@monty.org + Subject: get back in your box + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Or face the comfy chair. + """)) + payload = m.get_payload(0) + self.assertIsInstance(payload, self.message) + self.assertEqual(str(payload), str(content)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_with_non_ascii_and_coersion_to_7bit(self): + m = self._make_message() + m['Subject'] = "Escape report" + content = self._make_message() + content['To'] = 'police@monty.org' + content['From'] = 'victim@monty.org' + content['Subject'] = "Help" + content.set_content("j'ai un problème de python. il est sorti de son" + " vivarium.") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Subject: Escape report + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + j'ai un problème de python. il est sorti de son vivarium. + """).encode('utf-8')) + # The choice of base64 for the body encoding is because generator + # doesn't bother with heuristics and uses it unconditionally for utf-8 + # text. + # XXX: the first cte should be 7bit, too...that's a generator bug. + # XXX: the line length in the body also looks like a generator bug. + self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), + textwrap.dedent("""\ + Subject: Escape report + MIME-Version: 1.0 + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt + Lgo= + """)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_invalid_cte_raises(self): + m = self._make_message() + content = self._make_message() + for cte in 'quoted-printable base64'.split(): + for subtype in 'rfc822 external-body'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + subtype = 'external-body' + for cte in '8bit binary'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + + def test_set_image_jpg(self): + for content in (b"bogus content", + bytearray(b"bogus content"), + memoryview(b"bogus content")): + with self.subTest(content=content): + m = self._make_message() + raw_data_manager.set_content(m, content, 'image', 'jpeg') + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: image/jpeg + Content-Transfer-Encoding: base64 + + Ym9ndXMgY29udGVudA== + """)) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_audio_aif_with_quoted_printable_cte(self): + # Why you would use qp, I don't know, but it is technically supported. + # XXX: the incorrect line length is because binascii.b2a_qp doesn't + # support a line length parameter, but we must use it to get newline + # encoding. + # XXX: what about that lack of tailing newline? Do we actually handle + # that correctly in all cases? That is, if the *source* has an + # unencoded newline, do we add an extra newline to the returned payload + # or not? And can that actually be disambiguated based on the RFC? + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'audio', 'aif', cte='quoted-printable') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: audio/aif + Content-Transfer-Encoding: quoted-printable + + b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= + zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_video_mpeg_with_binary_cte(self): + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'video', 'mpeg', cte='binary') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: video/mpeg + Content-Transfer-Encoding: binary + + """).encode('ascii') + + # XXX: the second \n ought to be a \r, but generator gets it wrong. + # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. + b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_application_octet_stream_with_8bit_cte(self): + # In 8bit mode, univeral line end logic applies. It is up to the + # application to make sure the lines are short enough; we don't check. + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' + m.set_content(content, 'application', 'octet-stream', cte='8bit') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: application/octet-stream + Content-Transfer-Encoding: 8bit + + """).encode('ascii') + + b'b\xFFgus\tcon\nt\nent\n' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_headers_from_header_objects(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + raw_data_manager.set_content(m, content, headers=( + header_factory("To", "foo@example.com"), + header_factory("From", "foo@example.com"), + header_factory("Subject", "I'm talking to myself."))) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + To: foo@example.com + From: foo@example.com + Subject: I'm talking to myself. + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_from_strings(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, headers=( + "X-Foo-Header: foo", + "X-Bar-Header: bar",)) + self.assertEqual(str(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + X-Foo-Header: foo + X-Bar-Header: bar + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_with_invalid_duplicate_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + "Content-Type: foo/bar",) + ) + + def test_set_headers_with_invalid_duplicate_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + header_factory("Content-Type", " foo/bar"),) + ) + + def test_set_headers_with_defective_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + 'To: a@fairly@@invalid@address',) + ) + print(m['To'].defects) + + def test_set_headers_with_defective_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + header_factory('To', 'a@fairly@@invalid@address'),) + ) + print(m['To'].defects) + + def test_set_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def test_set_disposition_attachment(self): + m = self._make_message() + m.set_content('foo', disposition='attachment') + self.assertEqual(m['Content-Disposition'], 'attachment') + + def test_set_disposition_foo(self): + m = self._make_message() + m.set_content('foo', disposition='foo') + self.assertEqual(m['Content-Disposition'], 'foo') + + # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that + # would cause 'foo' above to raise. + + def test_set_filename(self): + m = self._make_message() + m.set_content('foo', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'attachment; filename="bar.txt"') + + def test_set_filename_and_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') + + def test_set_non_ascii_filename(self): + m = self._make_message() + m.set_content('foo', filename='ábárî.txt') + self.assertEqual(bytes(m), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + Content-Disposition: attachment; + filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt + + foo + """).encode('ascii')) + + content_object_params = { + 'text_plain': ('content', ()), + 'text_html': ('content', ('html',)), + 'application_octet_stream': (b'content', ('application', 'octet_stream')), + 'image_jpeg': (b'content', ('image', 'jpeg')), + 'message_rfc822': (message(), ()), + 'message_external_body': (message(), ('external-body',)), + } + + def content_object_as_header_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, headers=( + 'To: foo@example.com', + 'From: bar@simple.net')) + self.assertEqual(m['to'], 'foo@example.com') + self.assertEqual(m['from'], 'bar@simple.net') + + def content_object_as_disposition_inline_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') + self.assertEqual(m.get_filename(), "bár.txt") + self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") + + def content_object_as_cid_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, cid='some_random_stuff') + self.assertEqual(m['Content-ID'], 'some_random_stuff') + + def content_object_as_params_receiver(self, obj, mimetype): + m = self._make_message() + params = {'foo': 'bár', 'abc': 'xyz'} + m.set_content(obj, *mimetype, params=params) + if isinstance(obj, str): + params['charset'] = 'utf-8' + self.assertEqual(m['Content-Type'].params, params) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py --- a/Lib/test/test_email/test_headerregistry.py +++ b/Lib/test/test_email/test_headerregistry.py @@ -661,7 +661,7 @@ 'text/plain; name="ascii_is_the_default"'), 'rfc2231_bad_character_in_charset_parameter_value': ( - "text/plain; charset*=ascii''utf-8%E2%80%9D", + "text/plain; charset*=ascii''utf-8%F1%F2%F3", 'text/plain', 'text', 'plain', @@ -669,6 +669,18 @@ [errors.UndecodableBytesDefect], 'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'), + 'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': ( + "text/plain; charset*=ascii''utf-8%E2%80%9D", + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8”'}, + [errors.UndecodableBytesDefect], + 'text/plain; charset="utf-8”"', + ), + # XXX: if the above were *re*folded, it would get tagged as utf-8 + # instead of ascii in the param, since it now contains non-ASCII. + 'rfc2231_encoded_then_unencoded_segments': ( ('application/x-foo;' '\tname*0*="us-ascii\'en-us\'My";' diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py --- a/Lib/test/test_email/test_message.py +++ b/Lib/test/test_email/test_message.py @@ -1,6 +1,13 @@ import unittest +import textwrap from email import policy -from test.test_email import TestEmailBase +from email.message import MIMEMessage +from test.test_email import TestEmailBase, parameterize + + +# Helper. +def first(iterable): + return next(filter(lambda x: x is not None, iterable), None) class Test(TestEmailBase): @@ -14,5 +21,537 @@ m['To'] = 'xyz@abc' +@parameterize +class TestMIMEMessage(TestEmailBase): + + policy = policy.default + message = MIMEMessage + + # The first argument is a triple (related, html, plain) of indices into the + # list returned by 'walk' called on a Message constructed from the third. + # The indices indicate which part should match the corresponding part-type + # when passed to get_body (ie: the "first" part of that type in the + # message). The second argument is a list of indices into the 'walk' list + # of the attachments that should be returned by a call to + # 'iter_attachments'. The third argument is a list of indices into 'walk' + # that should be returned by a call to 'iter_parts'. Note that the first + # item returned by 'walk' is the Message itself. + + message_params = { + + 'empty_message': ( + (None, None, 0), + (), + (), + ""), + + 'non_mime_plain': ( + (None, None, 0), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + + simple text body + """)), + + 'mime_non_text': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: image/jpg + + bogus body. + """)), + + 'plain_html_alternative': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/alternative; boundary="===" + + preamble + + --=== + Content-type: text/plain + + simple body + + --=== + Content-Type: text/html + +

simple body

+ --===-- + """)), + + 'plain_html_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + preamble + + --=== + Content-type: text/plain + + simple body + + --=== + Content-Type: text/html + +

simple body

+ + --===-- + """)), + + 'plain_html_attachment_mixed': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-type: text/plain + + simple body + + --=== + Content-Type: text/html + Content-Disposition: attachment + +

simple body

+ + --===-- + """)), + + 'html_text_attachment_mixed': ( + (None, 2, None), + (1,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-type: text/plain + Content-Disposition: attachment + + simple body + + --=== + Content-Type: text/html + +

simple body

+ + --===-- + """)), + + 'html_text_attachment_inline_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-type: text/plain + Content-Disposition: inline + + simple body + + --=== + Content-Type: text/html + Content-Disposition: inline + +

simple body

+ + --===-- + """)), + + + 'related': ( + (0, 1, None), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/related; boundary="===" + + --=== + Content-type: text/html + +

simple body

+ + --=== + Content-Type: image/jpg + Content-ID: + + bogus body + + --===-- + """)), + + + 'mixed_alternative_plain_related': ( + (3, 4, 2), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/alternative; boundary="+++" + + --+++ + Content-Type: text/plain + + simple body + + --+++ + Content-Type: multipart/related; boundary="___" + + --___ + Content-Type: text/html + +

simple body

+ + --___ + Content-Type: image/jpg + Content-ID: + + bogus jpg body + + --___-- + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + 'message_rfc822': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + """)), + + 'mixed_text_message_rfc822': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + + Your message has bounced, ser. + + --=== + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + + --===-- + """)), + + } + + def message_as_body_source(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + expected = [None if n is None else allparts[n] for n in body_parts] + related = 0; html = 1; plain = 2 + self.assertEqual(m.get_body(), first(expected)) + self.assertEqual(m.get_body(preferencelist=('related', 'html', 'plain')), + first(expected)) + self.assertEqual(m.get_body(preferencelist=('related', 'html')), + first(expected[related:html+1])) + self.assertEqual(m.get_body(preferencelist=('related', 'plain')), + first([expected[related], expected[plain]])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain')), + first(expected[html:plain+1])) + self.assertEqual(m.get_body(preferencelist=['related']), expected[related]) + self.assertEqual(m.get_body(preferencelist=['html']), expected[html]) + self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain]) + self.assertEqual(m.get_body(preferencelist=('plain', 'html')), + first(expected[plain:html-1:-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'related')), + first([expected[plain], expected[related]])) + self.assertEqual(m.get_body(preferencelist=('html', 'related')), + first(expected[html::-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')), + first(expected[::-1])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')), + first([expected[html], + expected[plain], + expected[related]])) + + def message_as_attachment_source(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + attachments = [allparts[n] for n in attachments] + self.assertEqual(list(m.iter_attachments()), attachments) + + def message_as_parts_source(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + parts = [allparts[n] for n in parts] + self.assertEqual(list(m.iter_parts()), parts) + + class _TestContentManager: + def get_content(self, msg, *args, **kw): + return msg, args, kw + def set_content(self, msg, *args, **kw): + self.msg = msg + self.args = args + self.kw = kw + + def test_get_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertEqual(m.get_content(content_manager=cm), (m, (), {})) + msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_get_content_default_cm_comes_from_policy(self): + p = policy.default.clone(content_manager=self._TestContentManager()) + m = self._str_msg('', policy=p) + self.assertEqual(m.get_content(), (m, (), {})) + msg, args, kw = m.get_content('foo', bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_set_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + m.set_content(content_manager=cm) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + def test_set_content_default_cm_comes_from_policy(self): + cm = self._TestContentManager() + p = policy.default.clone(content_manager=cm) + m = self._str_msg('', policy=p) + m.set_content() + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + # Method should raise ValueError error when called on multipart/subtype. + subtype_params = ( + ('related', 'plain', 'succeeds'), + ('related', 'related', ''), + ('related', 'alternative', 'raises'), + ('related', 'mixed', 'raises'), + ('alternative', 'plain', 'succeeds'), + ('alternative', 'related', 'succeeds'), + ('alternative', 'alternative', ''), + ('alternative', 'mixed', 'raises'), + ('mixed', 'plain', 'succeeds'), + ('mixed', 'related', 'succeeds'), + ('mixed', 'alternative', 'succeeds'), + ('mixed', 'mixed', ''), + ) + + def subtype_as_make(self, method, subtype, outcome): + m = self.message() + + if outcome in ('', 'raises'): + m['Content-Type'] = 'multipart/' + subtype + with self.assertRaises(ValueError) as cm: + getattr(m, 'make_' + method)() + exc_text = str(cm.exception) + self.assertIn(subtype, exc_text) + self.assertIn(method, exc_text) + return + + msg_headers = ( + ('To', 'foo@bar.com'), + ('From', 'bar@foo.com'), + ('X-Random-Header', 'Corwin'), + ) + if subtype == 'text': + maintype = 'text' + payload = '' + else: + maintype = 'multipart' + payload = [] + maintype = 'text' if subtype=='plain' else 'multipart' + part_headers = ( + ('Content-Type', '/'.join([maintype, subtype])), + ('X-Trump', 'Random'), + ) + for name, value in msg_headers + part_headers: + m[name] = value + m.set_payload(payload) + getattr(m, 'make_' + method)() + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(len(m.get_payload()), 1) + self.assertEqual(m.get_content_subtype(), method) + for name, value in msg_headers: + self.assertEqual(m[name], value) + self.assertEqual(len(m), len(msg_headers)+1) # +1 for new Content-Type + part = next(m.iter_parts()) + self.assertEqual(len(part), len(part_headers)) + for name, value in part_headers: + self.assertEqual(part[name], value) + self.assertEqual(part.get_payload(), payload) + + def subtype_as_make_with_boundary(self, method, subtype, outcome): + # Doing all variation is a bit of overkill... + m = self.message() + if outcome in ('', 'raises'): + m['Content-Type'] = 'multipart/' + subtype + with self.assertRaises(ValueError) as cm: + getattr(m, 'make_' + method)() + return + m['Content-Type'] = ('text/plain' if subtype == 'plain' + else 'multipart/' + subtype) + getattr(m, 'make_' + method)(boundary="abc") + self.assertTrue(m.is_multipart()) + self.assertEqual(len(m.get_payload()), 1) + self.assertEqual(m.get_boundary(), 'abc') + + def test_policy_on_part_made_by_make_comes_from_message(self): + for method in ('make_related', 'make_alternative', 'make_mixed'): + m = self.message(policy=self.policy.clone(content_manager='foo')) + getattr(m, method)() + self.assertEqual(m.get_payload(0).policy.content_manager, 'foo') + + class _TestSetContentManager: + def set_content(self, msg, content, *args, **kw): + msg['Content-Type'] = 'text/plain' + msg.set_payload(content) + + def subtype_as_add(self, method, subtype, outcome): + cm = self._TestSetContentManager() + m = self.message() + add_method = 'add_attachment' if method=='mixed' else 'add_' + method + + if outcome == 'raises': + m['Content-Type'] = 'multipart/' + subtype + with self.assertRaises(ValueError) as ar: + getattr(m, add_method)() + exc_text = str(ar.exception) + self.assertIn(subtype, exc_text) + self.assertIn(method, exc_text) + return + + msg_headers = ( + ('To', 'foo@bar.com'), + ('From', 'bar@foo.com'), + ('X-Random-Header', 'Corwin'), + ) + if subtype == 'text': + maintype = 'text' + payload = '' + else: + maintype = 'multipart' + payload = [] + maintype = 'text' if subtype=='plain' else 'multipart' + part_headers = ( + ('Content-Type', '/'.join([maintype, subtype])), + ('X-Trump', 'Random'), + ) + for name, value in msg_headers + part_headers: + m[name] = value + m.set_payload(payload) + getattr(m, add_method)('test', content_manager=cm) + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(m.get_content_subtype(), method) + + if method == subtype: + self.assertEqual(len(m.get_payload()), 1) + for name, value in msg_headers + part_headers: + self.assertEqual(m[name], value) + part = next(m.iter_parts()) + self.assertEqual(part.get_content_type(), 'text/plain') + self.assertEqual(part.get_payload(), 'test') + return + + self.assertEqual(len(m.get_payload()), 2) + for name, value in msg_headers: + self.assertEqual(m[name], value) + self.assertEqual(len(m), len(msg_headers)+1) # +1 for new Content-Type + parts = m.iter_parts() + part = next(parts) + self.assertEqual(len(part), len(part_headers)) + for name, value in part_headers: + self.assertEqual(part[name], value) + self.assertEqual(part.get_payload(), payload) + new_part = next(parts) + self.assertEqual(new_part.get_content_type(), 'text/plain') + self.assertEqual(new_part.get_payload(), 'test') + + class _TestSetRaisingContentManager: + def set_content(self, msg, content, *args, **kw): + raise Exception('test') + + def test_default_content_manager_for_add_comes_from_policy(self): + cm = self._TestSetRaisingContentManager() + m = self.message(policy=self.policy.clone(content_manager=cm)) + for method in ('add_related', 'add_alternative', 'add_attachment'): + with self.assertRaises(Exception) as ar: + getattr(m, method)('') + self.assertEqual(str(ar.exception), 'test') + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py --- a/Lib/test/test_email/test_policy.py +++ b/Lib/test/test_email/test_policy.py @@ -30,6 +30,7 @@ 'raise_on_defect': False, 'header_factory': email.policy.EmailPolicy.header_factory, 'refold_source': 'long', + 'content_manager': email.policy.EmailPolicy.content_manager, }) # For each policy under test, we give here what we expect the defaults to