diff --git a/Doc/library/email.contentmanager.rst b/Doc/library/email.contentmanager.rst new file mode 100644 index 0000000..51e5161 --- /dev/null +++ b/Doc/library/email.contentmanager.rst @@ -0,0 +1,421 @@ +:mod:`email.contentmanager`: Managing MIME Content +-------------------------------------------------- + +.. module:: email.contentmanager + :synopsis: Storing and Retrieving Content from MIME Parts + +.. moduleauthor:: R. David Murray +.. sectionauthor:: R. David Murray + + +.. note:: + + The contentmanager module has been included in the standard library on a + :term:`provisional basis `. Backwards incompatible + changes (up to and including removal of the module) may occur if deemed + necessary by the core developers. + +.. versionadded:: 3.4 + as a :term:`provisional module `. + +The :mod:`~email.message` module provides a class that can represent an +arbitrary email message. That basic message model has a useful and flexible +API, but it provides only a lower-level API for interacting with the generic +parts of a message (the headers, generic header parameters, and the payload, +which may be a list of sub-parts). This module provides classes and tools +that provide an enhanced and extensible API for dealing with various specific +types of content, including the ability to retrieve the content of the message +as a specialized object type rather than as a simple bytes object. The module +automatically takes care of the RFC-specified MIME details (required headers +and parameters, etc.) for the certain common content types content properties, +and support for additional types can be added by an application using the +extension mechanisms. + +This module defines the eponymous "Content Manager" classes. The base +:class:`.ContentManager` class defines an API for registering content +management functions which extract data from ``Message`` objects or insert data +and headers into ``Message`` objects, thus providing a way of converting +between ``Message`` objects containing data and other representations of that +data (Python data types, specialized Python objects, external files, etc). The +module also defines one concrete content manager: :data:`raw_data_manager` +converts between MIME content types and ``str`` or ``bytes`` data. It also +provides a convenient API for managing the MIME parameters when inserting +content into ``Message``\ s. It also handles inserting and extracting +``Message`` objects when dealing with the ``message/rfc822`` content type. + +Another part of the enhanced interface is subclasses of +:class:`~email.message.Message` that provide new convenience API functions, +including convenience methods for calling the Content Managers derived from +this module. + +.. note:: + + Although :class:`.EmailMessage` and :class:`.MIMEPart` are currently + documented in this module because of the provisional nature of the code, the + implementation lives in the :mod:`email.message` module. + + +.. class:: EmailMessage(policy=default) + + If *policy* is specified (it must be an instance of a :mod:`~email.policy` + class) use the rules it specifies to udpate and serialize the representation + of the message. If *policy* is not set, use the + :class:`~email.policy.default` policy, which follows the rules of the email + RFCs except for line endings (instead of the RFC mandated ``\r\n``, it uses + the Python standard ``\n`` line endings). For more information see the + :mod:`~email.policy` documentation. + + This class is a subclass of :class:`~email.message.Message`. It adds + the following methods: + + + .. method:: get_body(preferencelist=('related', 'html', 'plain')) + + Return the MIME part that is the best candidate to be the "body" of the + message. + + *preferencelist* must be a sequence of strings from the set ``related``, + ``html``, and ``plain``, and indicates the order of preference for the + content type of the part returned. + + Start looking for candidate matches with the object on which the + ``get_body`` method is called. + + If ``related`` is not included in *preferencelist*, consider the root + part (or subpart of the root part) of any related encountered as a + candidate if the (sub-)part matches a preference. + + When encountering a ``multipart/related``, check the ``start`` parameter + and if a part with a matching :mailheader:`Content-ID` is found, consider + only it when looking for candidate matches. Otherwise consider only the + first (default root) part of the ``multipart/related``. + + If a part has a :mailheader:``Content-Disposition`` header, only consider + the part a candidate match if the value of the header is ``inline``. + + If none of the candidates matches any of the preferences in + *preferneclist*, return ``None``. + + Notes: (1) For most applications the only *preferencelist* combinations + that really make sense are ``('plain',)``, ``('html', 'plain')``, and the + default, ``('related', 'html', 'plain')``. (2) Because matching starts + with the object on which ``get_body`` is called, calling ``get_body`` on + a ``multipart/related`` will return the object itself unless + *preferencelist* has a non-default value. (3) Messages (or message parts) + that do not specify a :mailheader:`Content-Type` or whose + :mailheader:`Content-Type` header is invalid will be treated as if they + are of type ``text/plain``, which may occasionally cause ``get_body`` to + return unexpected results. + + + .. method:: iter_attachments() + + Return an iterator over all of the parts of the message that are not + candidate "body" parts. That is, skip the first occurrence of each of + ``text/plain``, ``text/html``, ``multipart/related``, or + ``multipart/alternative`` (unless they are explicitly marked as + attachments via :mailheader:`Content-Disposition: attachment`), and + return all remaining parts. When applied directly to a + ``multipart/related``, return an iterator over the all the related parts + except the root part (ie: the part pointed to by the ``start`` parameter, + or the first part if there is no ``start`` parameter or the ``start`` + parameter doesn't match the :mailheader:`Content-ID` of any of the + parts). When applied directly to a ``multipart/alternative`` or a + non-``multipart``, return an empty iterator. + + + .. method:: iter_parts() + + Return an iterator over all of the immediate sub-parts of the message, + which will be empty for a non-``multipart``. (See also + :meth:``~email.message.walk``.) + + + .. method:: get_content(*args, content_manager=None, **kw) + + Call the ``get_content`` method of the *content_manager*, passing self + as the message object, and passing along any other arguments or keywords + as additional arguments. If *content_manager* is not specified, use + the ``content_manager`` specified by the current :mod:`~email.policy`. + + + .. method:: set_content(*args, content_manager=None, **kw) + + Call the ``set_content`` method of the *content_manager*, passing self + as the message object, and passing along any other arguments or keywords + as additional arguments. If *content_manager* is not specified, use + the ``content_manager`` specified by the current :mod:`~email.policy`. + + + .. method:: make_related(boundary=None) + + Convert a non-``multipart`` message into a ``multipart/related`` message, + moving any existing :mailheader:`Content-` headers and payload into a + (new) first part of the ``multipart``. If *boundary* is specified, use + it as the boundary string in the multipart, otherwise leave the boundary + to be automatically created when it is needed (for example, when the + message is serialized). + + + .. method:: make_alternative(boundary=None) + + Convert a non-``multipart`` or a ``multipart/related`` into a + ``multipart/alternative``, moving any existing :mailheader:`Content-` + headers and payload into a (new) first part of the ``multipart``. If + *boundary* is specified, use it as the boundary string in the multipart, + otherwise leave the boundary to be automatically created when it is + needed (for example, when the message is serialized). + + + .. method:: make_mixed(boundary=None) + + Convert a non-``multipart``, a ``multipart/related``, or a + ``multipart-alternative`` into a ``multipart/mixed``, moving any existing + :mailheader:`Content-` headers and payload into a (new) first part of the + ``multipart``. If *boundary* is specified, use it as the boundary string + in the multipart, otherwise leave the boundary to be automatically + created when it is needed (for example, when the message is serialized). + + + .. method:: add_related(*args, content_manager=None, **kw) + + If the message is a ``multipart/related``, create a new message + object, pass all of the arguments to its :meth:`set_content` method, + and :meth:`~email.message.Message.attach` it to the ``multipart``. If + the message is a non-``multipart``, call :meth:`make_related` and then + proceed as above. If the message is any other type of ``multipart``, + raise a :exc:`TypeError`. If *content_manager* is not specified, use + the ``content_manager`` specified by the current :mod:`~email.policy`. + If the added part has no :mailheader:`Content-Disposition` header, + add one with the value ``inline``. + + + .. method:: add_alternative(*args, content_manager=None, **kw) + + If the message is a ``multipart/alternative``, create a new message + object, pass all of the arguments to its :meth:`set_content` method, and + :meth:`~email.message.Message.attach` it to the ``multipart``. If the + message is a non-``multipart`` or ``multipart/related``, call + :meth:`make_alternative` and then proceed as above. If the message is + any other type of ``multipart``, raise a :exc:`TypeError`. If + *content_manager* is not specified, use the ``content_manager`` specified + by the current :mod:`~email.policy`. + + + .. method:: add_attachment(*args, content_manager=None, **kw) + + If the message is a ``multipart/mixed``, create a new message object, + pass all of the arguments to its :meth:`set_content` method, and + :meth:`~email.message.Message.attach` it to the ``multipart``. If the + message is a non-``multipart``, ``multipart/related``, or + ``multipart/alternative``, call :meth:`make_mixed` and then proceed as + above. If *content_manager* is not specified, use the ``content_manager`` + specified by the current :mod:`~email.policy`. If the added part + has no :mailheader:`Content-Disposition` header, add one with the value + ``attachment``. This method can be used both for explicit attachments + (:mailheader:`Content-Disposition: attachment` and ``inline`` attachments + (:mailheader:`Content-Disposition: inline`), by passing appropriate + options to the ``content_manager``. + + + .. method:: clear() + + Remove the payload and all of the headers. + + + .. method:: clear_content() + + Remove the payload and all of the :exc:`Content-` headers, leaving + all other headers intact and in their original order. + + +.. class:: ContentManager() + + Base class for content managers. Provides the standard registry mechanisms + to register converters between MIME content and other representations, as + well as the ``get_content`` and ``set_content`` dispatch methods. + + + .. method:: get_content(msg, *args, **kw) + + Look up a handler function based on the ``mimetype`` of *msg* (see next + paragraph), call it, passing through all arguments, and return the result + of the call. The expectation is that the handler will extract the + payload from *msg* and return an object that encodes information about + the extracted data. + + To find the handler, look for the following keys in the registry, + stopping with the first one found: + + * the string representing the full MIME type (``maintype/subtype``) + * the string representing the ``maintype`` + * the empty string + + If none of these keys produce a handler, raise a :exc:`KeyError` for the + full MIME type. + + + .. method:: set_content(msg, obj, *args, **kw) + + If the ``maintype`` is ``multipart``, raise a :exc:`TypeError`; otherwise + look up a handler function based on the type of *obj* (see next + paragraph), call :meth:`~email.message.EmailMessage.clear_content` on the + *msg*, and call the handler function, passing through all arguments. The + expectation is that the handler will transform and store *obj* into + *msg*, possibly making other changes to *msg* as well, such as adding + various MIME headers to encode information needed to interpret the stored + data. + + To find the handler, obtain the type of *obj* (``typ = type(obj)``), and + look for the following keys in the registry, stopping with the first one + found: + + * the type itself (``typ``) + * the type's fully qualified name (``typ.__module__ + '.' + + typ.__qualname__``). + * the type's qualname (``typ.__qualname__``) + * the type's name (``typ.__name__``). + + If none of the above match, repeat all of the checks above for each of + the types in the :term:`MRO` (``typ.__mro__``). Finally, if no other key + yields a handler, check for a handler for the key ``None``. If there is + no handler for ``None``, raise a :exc:`KeyError` for the fully + qualified name of the type. + + Also add a :mailheader:`MIME-Version` header if one is not present (see + also :class:`.MIMEPart`). + + + .. method:: add_get_handler(key, handler) + + Record the function *handler* as the handler for *key*. For the possible + values of *key*, see :meth:`get_content`. + + + .. method:: add_set_handler(typekey, handler) + + Record *handler* as the function to call when an object of a type + matching *typekey* is passed to :meth:`set_content`. For the possible + values of *typekey*, see :meth:`set_content`. + + +.. class:: MIMEPart(policy=default) + + This class represents a subpart of a MIME message. It is identical to + :class:`EmailMessage`, except that no :mailheader:`MIME-Version` headers are + added when :meth:`~EmailMessage.set_content` is called, since sub-parts do + not need their own :mailheader:`MIME-Version` headers. + + +Content Manager Instances +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Currently the email package provides only one concrete content manager, +:data:`raw_data_manager`, although more may be added in the future. +:data:`raw_data_manager` is the +:attr:`~email.policy.EmailPolicy.content_manager` provided by +:attr:`~email.policy.EmailPolicy` and its derivatives. + + +.. data:: raw_data_manager + + This content manager provides only a minimum interface beyond that provided + by :class:`~email.message.Message` itself: it deals only with text, raw + byte strings, and :class:`~email.message.Message` objects. Nevertheless, it + provides significant advantages compared to the base API: ``get_content`` on + a text part will return a unicode string without the application needing to + manually decode it, ``set_content`` provides a rich set of options for + controlling the headers added to a part and controlling the content transfer + encoding, and it enables the use of the various ``add_`` methods, thereby + simplifying the creation of multipart messages. + + .. method:: get_content(msg, errors='replace') + + Return the payload of the part as either a string (for ``text`` parts), a + :class:`~email.message.EmailMessage` object (for ``message/rfc822`` + parts), or a ``bytes`` object (for all other non-multipart types). Raise + a :exc:`KeyError` if called on a ``multipart``. If the part is a + ``text`` part and *errors* is specified, use it as the error handler when + decoding the payload to unicode. The default error handler is + ``replace``. + + .. method:: set_content(msg, <'str'>, subtype="plain", charset='utf-8' \ + cte=None, \ + disposition=None, filename=None, cid=None, \ + params=None, headers=None) + set_content(msg, <'bytes'>, maintype, subtype, cte="base64", \ + disposition=None, filename=None, cid=None, \ + params=None, headers=None) + set_content(msg, <'Message'>, cte=None, \ + disposition=None, filename=None, cid=None, \ + params=None, headers=None) + set_content(msg, <'list'>, subtype='mixed', \ + disposition=None, filename=None, cid=None, \ + params=None, headers=None) + + Add headers and payload to *msg*: + + Add a :mailheader:`Content-Type` header with a ``maintype/subtype`` + value. + + * For ``str``, set the MIME ``maintype`` to ``text``, and set the + subtype to *subtype* if it is specified, or ``plain`` if it is not. + * For ``bytes``, use the specified *maintype* and *subtype*, or + raise a :exc:`TypeError` if they are not specified. + * For :class:`~email.message.Message` objects, set the maintype to + ``message``, and set the subtype to *subtype* if it is specified + or ``rfc822`` if it is not. If *subtype* is ``partial``, raise an + error (``bytes`` objects must be used to construct + ``message/partial`` parts). + * For *<'list'>*, which should be a list of + :class:`~email.message.Message` objects, set the ``maintype`` to + ``multipart``, and the ``subtype`` to *subtype* if it is + specified, and ``mixed`` if it is not. If the message parts in + the *<'list'>* have :mailheader:`MIME-Version` headers, remove + them. + + If *charset* is provided (which is valid only for ``str``), encode the + string to bytes using the specified character set. The default is + ``utf-8``. If the specified *charset* is a known alias for a standard + MIME charset name, use the standard charset instead. + + If *cte* is set, encode the payload using the specified content transfer + encoding, and set the :mailheader:`Content-Transfer-Endcoding` header to + that value. For ``str`` objects, if it is not set use heuristics to + determine the most compact encoding. Possible values for *cte* are + ``quoted-printable``, ``base64``, ``7bit``, ``8bit``, and ``binary``. + If the input cannot be encoded in the specified encoding (eg: ``7bit``), + raise a :exc:`ValueError`. For :class:`~email.message.Message`, per + :rfc:`2046`, raise an error if a *cte* of ``quoted-printable`` or + ``base64`` is requested for *subtype* ``rfc822``, and for any *cte* + other than ``7bit`` for *subtype* ``external-body``. For + ``message/rfc822``, use ``8bit`` if *cte* is not specified. For all + other values of *subtype*, use ``7bit``. + + .. note:: A *cte* of ``binary`` does not actually work correctly yet. + The ``Message`` object as modified by ``set_content`` is correct, but + :class:`~email.generator.BytesGenerator` does not serialize it + correctly. + + If *disposition* is set, use it as the value of the + :mailheader:`Content-Disposition` header. If not specified, and + *filename* is specified, add the header with the value ``attachment``. + If it is not specified and *filename* is also not specified, do not add + the header. The only valid values for *disposition* are ``attachment`` + and ``inline``. + + If *filename* is specified, use it as the value of the ``filename`` + parameter of the :mailheader:`Content-Disposition` header. There is no + default. + + If *cid* is specified, add a :mailheader:`Content-ID` header with + *cid* as its value. + + If *params* is specified, iterate its ``items`` method and use the + resulting ``(key, value)`` pairs to set additional paramters on the + :mailheader:`Content-Type` header. + + If *headers* is specified and is a list of strings of the form + ``headername: headervalue`` or a list of ``header`` objects + (distinguised from strings by having a ``name`` attribute), add the + headers to *msg*. diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst index 4a34c36..f278a0a 100644 --- a/Doc/library/email.message.rst +++ b/Doc/library/email.message.rst @@ -33,10 +33,11 @@ Here are the methods of the :class:`Message` class: .. class:: Message(policy=compat32) - The *policy* argument determiens the :mod:`~email.policy` that will be used - to update the message model. The default value, :class:`compat32 - ` maintains backward compatibility with the - Python 3.2 version of the email package. For more information see the + If *policy* is specified (it must be an instance of a :mod:`~email.policy` + class) use the rules it specifies to udpate and serialize the representation + of the message. If *policy* is not set, use the :class`compat32 + ` policy, which maintains backward compatibility with + the Python 3.2 version of the email package. For more information see the :mod:`~email.policy` documentation. .. versionchanged:: 3.3 The *policy* keyword argument was added. @@ -465,7 +466,8 @@ Here are the methods of the :class:`Message` class: to ``False``. - .. method:: set_param(param, value, header='Content-Type', requote=True, charset=None, language='') + .. method:: set_param(param, value, header='Content-Type', requote=True, + charset=None, language='', replace=False) Set a parameter in the :mailheader:`Content-Type` header. If the parameter already exists in the header, its value will be replaced with @@ -482,6 +484,12 @@ Here are the methods of the :class:`Message` class: language, defaulting to the empty string. Both *charset* and *language* should be strings. + If *replace* is ``False`` (the default) the header is moved to the + end of the list of headers. If *replace* is ``True``, the header + will be updated in place. + + .. versionchanged: 3.4 ``replace`` keyword was added. + .. method:: del_param(param, header='content-type', requote=True) diff --git a/Doc/library/email.policy.rst b/Doc/library/email.policy.rst index 5856879..eb21aa3 100644 --- a/Doc/library/email.policy.rst +++ b/Doc/library/email.policy.rst @@ -371,7 +371,7 @@ added matters. To illustrate:: to) :rfc:`5322`, :rfc:`2047`, and the current MIME RFCs. This policy adds new header parsing and folding algorithms. Instead of - simple strings, headers are custom objects with custom attributes depending + simple strings, headers are ``str`` subclasses with attributes that depend on the type of the field. The parsing and folding algorithm fully implement :rfc:`2047` and :rfc:`5322`. @@ -408,6 +408,20 @@ added matters. To illustrate:: fields are treated as unstructured. This list will be completed before the extension is marked stable.) + .. attribute:: content_manager + + An object with at least two methods: get_content and set_content. When + the :meth:`~email.message.Message.get_content` or + :meth:`~email.message.Message.set_content` method of a + :class:`~email.message.Message` object is called, it calls the + corresponding method of this object, passing it the message object as its + first argument, and any arguments or keywords that were passed to it as + additional arguments. By default ``content_manager`` is set to + :data:`~email.contentmanager.raw_data_manager`. + + .. versionadded 3.4 + + The class provides the following concrete implementations of the abstract methods of :class:`Policy`: @@ -427,7 +441,7 @@ added matters. To illustrate:: The name is returned unchanged. If the input value has a ``name`` attribute and it matches *name* ignoring case, the value is returned unchanged. Otherwise the *name* and *value* are passed to - ``header_factory``, and the resulting custom header object is returned as + ``header_factory``, and the resulting header object is returned as the value. In this case a ``ValueError`` is raised if the input value contains CR or LF characters. @@ -435,7 +449,7 @@ added matters. To illustrate:: If the value has a ``name`` attribute, it is returned to unmodified. Otherwise the *name*, and the *value* with any CR or LF characters - removed, are passed to the ``header_factory``, and the resulting custom + removed, are passed to the ``header_factory``, and the resulting header object is returned. Any surrogateescaped bytes get turned into the unicode unknown-character glyph. @@ -445,9 +459,9 @@ added matters. To illustrate:: A value is considered to be a 'source value' if and only if it does not have a ``name`` attribute (having a ``name`` attribute means it is a header object of some sort). If a source value needs to be refolded - according to the policy, it is converted into a custom header object by + according to the policy, it is converted into a header object by passing the *name* and the *value* with any CR and LF characters removed - to the ``header_factory``. Folding of a custom header object is done by + to the ``header_factory``. Folding of a header object is done by calling its ``fold`` method with the current policy. Source values are split into lines using :meth:`~str.splitlines`. If @@ -502,23 +516,23 @@ With all of these :class:`EmailPolicies <.EmailPolicy>`, the effective API of the email package is changed from the Python 3.2 API in the following ways: * Setting a header on a :class:`~email.message.Message` results in that - header being parsed and a custom header object created. + header being parsed and a header object created. * Fetching a header value from a :class:`~email.message.Message` results - in that header being parsed and a custom header object created and + in that header being parsed and a header object created and returned. - * Any custom header object, or any header that is refolded due to the + * Any header object, or any header that is refolded due to the policy settings, is folded using an algorithm that fully implements the RFC folding algorithms, including knowing where encoded words are required and allowed. From the application view, this means that any header obtained through the -:class:`~email.message.Message` is a custom header object with custom +:class:`~email.message.Message` is a header object with extra attributes, whose string value is the fully decoded unicode value of the header. Likewise, a header may be assigned a new value, or a new header created, using a unicode string, and the policy will take care of converting the unicode string into the correct RFC encoded form. -The custom header objects and their attributes are described in +The header objects and their attributes are described in :mod:`~email.headerregistry`. diff --git a/Doc/library/email.rst b/Doc/library/email.rst index a6cbbce..331d2ef 100644 --- a/Doc/library/email.rst +++ b/Doc/library/email.rst @@ -53,6 +53,7 @@ Contents of the :mod:`email` package documentation: email.generator.rst email.policy.rst email.headerregistry.rst + email.contentmanager.rst email.mime.rst email.header.rst email.charset.rst diff --git a/Lib/email/contentmanager.py b/Lib/email/contentmanager.py new file mode 100644 index 0000000..374320b --- /dev/null +++ b/Lib/email/contentmanager.py @@ -0,0 +1,249 @@ +import binascii +import email.charset +import email.message +import email.errors +from email import quoprimime + +class ContentManager: + + def __init__(self): + self.get_handlers = {} + self.set_handlers = {} + + def add_get_handler(self, key, handler): + self.get_handlers[key] = handler + + def get_content(self, msg, *args, **kw): + content_type = msg.get_content_type() + if content_type in self.get_handlers: + return self.get_handlers[content_type](msg, *args, **kw) + maintype = msg.get_content_maintype() + if maintype in self.get_handlers: + return self.get_handlers[maintype](msg, *args, **kw) + if '' in self.get_handlers: + return self.get_handlers[''](msg, *args, **kw) + raise KeyError(content_type) + + def add_set_handler(self, typekey, handler): + self.set_handlers[typekey] = handler + + def set_content(self, msg, obj, *args, **kw): + if msg.get_content_maintype() == 'multipart': + # XXX: is this error a good idea or not? We can remove it later, + # but we can't add it later, so do it for now. + raise TypeError("set_content not valid on multipart") + handler = self._find_set_handler(msg, obj) + msg.clear_content() + handler(msg, obj, *args, **kw) + + def _find_set_handler(self, msg, obj): + full_path_for_error = None + for typ in type(obj).__mro__: + if typ in self.set_handlers: + return self.set_handlers[typ] + qname = typ.__qualname__ + modname = getattr(typ, '__module__', '') + full_path = '.'.join((modname, qname)) if modname else qname + if full_path_for_error is None: + full_path_for_error = full_path + if full_path in self.set_handlers: + return self.set_handlers[full_path] + if qname in self.set_handlers: + return self.set_handlers[qname] + name = typ.__name__ + if name in self.set_handlers: + return self.set_handlers[name] + if None in self.set_handlers: + return self.set_handlers[None] + raise KeyError(full_path_for_error) + + +raw_data_manager = ContentManager() + + +def get_text_content(msg, errors='replace'): + content = msg.get_payload(decode=True) + charset = msg.get_param('charset', 'ASCII') + return content.decode(charset, errors=errors) +raw_data_manager.add_get_handler('text', get_text_content) + + +def get_non_text_content(msg): + return msg.get_payload(decode=True) +for maintype in 'audio image video application'.split(): + raw_data_manager.add_get_handler(maintype, get_non_text_content) + + +def get_message_content(msg): + return msg.get_payload(0) +for subtype in 'rfc822 external-body'.split(): + raw_data_manager.add_get_handler('message/'+subtype, get_message_content) + + +def get_and_fixup_unknown_message_content(msg): + # If we don't understand a message subtype, we are supposed to treat it as + # if it were application/octet-stream, per + # tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that, + # so do our best to fix things up. Note that it is *not* appropriate to + # model message/partial content as Message objects, so they are handled + # here as well. (How to reassemble them is out of scope for this comment :) + return bytes(msg.get_payload(0)) +raw_data_manager.add_get_handler('message', + get_and_fixup_unknown_message_content) + + +def _prepare_set(msg, maintype, subtype, headers): + msg['Content-Type'] = '/'.join((maintype, subtype)) + if headers: + if not hasattr(headers[0], 'name'): + mp = msg.policy + headers = [mp.header_factory(*mp.header_source_parse([header])) + for header in headers] + try: + for header in headers: + if header.defects: + raise header.defects[0] + msg[header.name] = header + except email.errors.HeaderDefect as exc: + raise ValueError("Invalid header: {}".format( + header.fold(policy=msg.policy))) from exc + + +def _finalize_set(msg, disposition, filename, cid, params, headers): + if disposition is None and filename is not None: + disposition = 'attachment' + if disposition is not None: + msg['Content-Disposition'] = disposition + if filename is not None: + msg.set_param('filename', + filename, + header='Content-Disposition', + replace=True) + if cid is not None: + msg['Content-ID'] = cid + if params is not None: + for key, value in params.items(): + msg.set_param(key, value) + + +# XXX: This is a cleaned-up version of base64mime.body_encode. It would +# be nice to drop both this and quoprimime.body_encode in favor of +# enhanced binascii routines that accepted a max_line_length parameter. +def _encode_base64(data, max_line_length): + encoded_lines = [] + unencoded_bytes_per_line = max_line_length * 3 // 4 + for i in range(0, len(data), unencoded_bytes_per_line): + thisline = data[i:i+unencoded_bytes_per_line] + encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii')) + return ''.join(encoded_lines) + + +def _encode_text(string, charset, cte, policy): + lines = string.encode(charset).splitlines() + linesep = policy.linesep.encode('ascii') + def embeded_body(lines): return linesep.join(lines) + linesep + def normal_body(lines): return b'\n'.join(lines) + b'\n' + if cte==None: + # Use heuristics to decide on the "best" encoding. + try: + return '7bit', normal_body(lines).decode('ascii') + except UnicodeDecodeError: + pass + if (policy.cte_type == '8bit' and + max((len(x) for x in lines)) <= policy.max_line_length): + return '8bit', normal_body(lines).decode('ascii', 'surrogateescape') + sniff = embeded_body(lines[:10]) + sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'), + policy.max_line_length) + sniff_base64 = binascii.b2a_base64(sniff) + # This is a little unfair to qp; it includes lineseps and base64 doesn't. + if len(sniff_qp) > len(sniff_base64): + cte = 'base64' + else: + cte = 'quoted-printable' + if len(lines) <= 10: + return cte, sniff_qp + if cte == '7bit': + data = normal_body(lines).decode('ascii') + elif cte == '8bit': + data = normal_body(lines).decode('ascii', 'surrogateescape') + elif cte == 'quoted-printable': + data = quoprimime.body_encode(normal_body(lines).decode('latin-1'), + policy.max_line_length) + elif cte == 'base64': + data = _encode_base64(embeded_body(lines), policy.max_line_length) + else: + raise ValueError("Unknown content transfer encoding {}".format(cte)) + return cte, data + + +def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, 'text', subtype, headers) + cte, payload = _encode_text(string, charset, cte, msg.policy) + msg.set_payload(payload) + msg.set_param('charset', + email.charset.ALIASES.get(charset, charset), + replace=True) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(str, set_text_content) + + +def set_message_content(msg, message, subtype="rfc822", cte=None, + disposition=None, filename=None, cid=None, + params=None, headers=None): + if subtype == 'partial': + raise ValueError("message/partial is not supported for Message objects") + if subtype == 'rfc822': + if cte not in (None, '7bit', '8bit', 'binary'): + # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate. + raise ValueError( + "message/rfc822 parts do not support cte={}".format(cte)) + # 8bit will get coerced on serialization if policy.cte_type='7bit'. We + # may end up claiming 8bit when it isn't needed, but the only negative + # result of that should be a gateway that needs to coerce to 7bit + # having to look through the whole embedded message to discover whether + # or not it actually has to do anything. + cte = '8bit' if cte is None else cte + elif subtype == 'external-body': + if cte not in (None, '7bit'): + # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate. + raise ValueError( + "message/external-body parts do not support cte={}".format(cte)) + cte = '7bit' + elif cte is None: + # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future + # subtypes should be restricted to 7bit, so assume that. + cte = '7bit' + _prepare_set(msg, 'message', subtype, headers) + msg.set_payload([message]) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +raw_data_manager.add_set_handler(email.message.Message, set_message_content) + + +def set_bytes_content(msg, data, maintype, subtype, cte='base64', + disposition=None, filename=None, cid=None, + params=None, headers=None): + _prepare_set(msg, maintype, subtype, headers) + if cte == 'base64': + data = _encode_base64(data, max_line_length=msg.policy.max_line_length) + elif cte == 'quoted-printable': + # XXX: quoprimime.body_encode won't encode newline characters in data, + # so we can't use it. This means max_line_length is ignored. Another + # bug to fix later. (Note: encoders.quopri is broken on line ends. + data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True) + data = data.decode('ascii') + elif cte == '7bit': + # Make sure it really is only ASCII. The early warning here seems + # worth the overhead...if you care write your own content manager :). + data.encode('ascii') + elif cte in ('8bit', 'binary'): + data = data.decode('ascii', 'surrogateescape') + msg.set_payload(data) + msg['Content-Transfer-Encoding'] = cte + _finalize_set(msg, disposition, filename, cid, params, headers) +for typ in (bytes, bytearray, memoryview): + raw_data_manager.add_set_handler(typ, set_bytes_content) diff --git a/Lib/email/message.py b/Lib/email/message.py index ebaf1c1..7830186 100644 --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -8,8 +8,6 @@ __all__ = ['Message'] import re import uu -import base64 -import binascii from io import BytesIO, StringIO # Intrapackage imports @@ -679,7 +677,7 @@ class Message: return failobj def set_param(self, param, value, header='Content-Type', requote=True, - charset=None, language=''): + charset=None, language='', replace=False): """Set a parameter in the Content-Type header. If the parameter already exists in the header, its value will be @@ -723,8 +721,11 @@ class Message: else: ctype = SEMISPACE.join([ctype, append_param]) if ctype != self.get(header): - del self[header] - self[header] = ctype + if replace: + self.replace_header(header, ctype) + else: + del self[header] + self[header] = ctype def del_param(self, param, header='content-type', requote=True): """Remove the given parameter completely from the Content-Type header. @@ -905,3 +906,208 @@ class Message: # I.e. def walk(self): ... from email.iterators import walk + + +class MIMEPart(Message): + + def __init__(self, policy=None): + if policy is None: + from email.policy import default + policy = default + Message.__init__(self, policy) + + def _find_body(self, part, preferencelist, found): + if part.get('content-disposition') not in (None, 'inline'): + # It's an attachment, so not a body candidate. + return found + maintype, subtype = part.get_content_type().split('/') + if maintype == 'multipart': + if subtype != 'related': + return self._find_body_in_subparts(part, preferencelist, found) + if 'related' in preferencelist: + priority = preferencelist.index(subtype) + if found[priority] is None: + found[priority] = part + if priority == 0: + return found + candidate = None + start = part.get_param('start') + if start: + for subpart in part.iter_parts(): + if subpart['content-id'] == start: + candidate = subpart + break + if candidate is None: + subparts = part.get_payload() + candidate = subparts[0] if subparts else None + if candidate is not None: + found = self._find_body(candidate, preferencelist, found) + return found + elif maintype == 'text' and subtype in preferencelist: + priority = preferencelist.index(subtype) + if found[priority] is None: + found[preferencelist.index(subtype)] = part + return found + + def _find_body_in_subparts(self, part, preferencelist, found): + for subpart in part.iter_parts(): + found = self._find_body(subpart, preferencelist, found) + if found[0] is not None: + return found + return found + + def get_body(self, preferencelist=('related', 'html', 'plain')): + """Return best candidate mime part for display as 'body' of message. + + Do a depth first search, starting with self, looking for the first part + matching each of the items in preferencelist, and return the part + corresponding to the first item that has a match, or None if no items + have a match. If 'related' is not included in preferencelist, consider + the root part of any multipart/related encountered as a candidate + match. Ignore parts with 'Content-Disposition: attachment'. + """ + found = self._find_body(self, preferencelist, + [None] * len(preferencelist)) + # If #18652 is added, use this instead: + #return first_true(found, pred=lambda x: x is not None) + return next(filter(lambda x: x is not None, found), None) + + def iter_attachments(self): + """Return an iterator over the non-main parts of a multipart. + + Skip the first of each occurrence of text/plain, text/html, + multipart/related, or multipart/alternative in the multipart (unless + they have a 'Content-Disposition: attachment' header) and include all + remaining subparts in the returned iterator. When applied to a + multipart/related, return all parts except the root part. Return an + empty iterator when applied to a multipart/alternative or a + non-multipart. + """ + maintype, subtype = self.get_content_type().split('/') + if maintype != 'multipart' or subtype == 'alternative': + return + parts = self.get_payload() + if maintype == 'multipart' and subtype == 'related': + # For related, we treat everything but the root as an attachment. The + # root may be indicated by 'start'; if there's no start or we can't + # find the named start, treat the first subpart as the root. + start = self.get_param('start') + if start: + found = False + attachments = [] + for part in parts: + if part.get('content-id') == start: + found = True + else: + attachments.append(part) + if found: + yield from attachments + return + parts.pop(0) + yield from parts + return + # Otherwise we more or less invert the remaining logic in get_body. + # This only really works in edge cases (ex: non-text relateds or + # alternatives) if the sending agent sets content-disposition. + seen = [] # Only skip the first example of each candidate type. + for part in parts: + maintype, subtype = part.get_content_type().split('/') + if ((maintype == 'text' and subtype in ('html', 'plain') or + maintype == 'multipart' and + subtype in ('related', 'alternative')) and + part.get('content-disposition') != 'attachment' and + subtype not in seen): + seen.append(subtype) + continue + yield part + + def iter_parts(self): + """Return an iterator over all immediate subparts of a multipart. + + Return an empty iterator for a non-multipart. + """ + if self.get_content_maintype() == 'multipart': + yield from self.get_payload() + + def get_content(self, *args, content_manager=None, **kw): + if content_manager is None: + content_manager = self.policy.content_manager + return content_manager.get_content(self, *args, **kw) + + def set_content(self, *args, content_manager=None, **kw): + if content_manager is None: + content_manager = self.policy.content_manager + content_manager.set_content(self, *args, **kw) + + def _make_multipart(self, subtype, disallowed_subtypes, boundary): + if self.get_content_maintype() == 'multipart': + existing_subtype = self.get_content_subtype() + disallowed_subtypes = disallowed_subtypes + (subtype,) + if existing_subtype in disallowed_subtypes: + raise ValueError("Cannot convert {} to {}".format( + existing_subtype, subtype)) + keep_headers = [] + part_headers = [] + while self._headers: + name, value = self._headers.pop(0) + if name.lower().startswith('content-'): + part_headers.append((name, value)) + else: + keep_headers.append((name, value)) + if part_headers: + # There is existing content, move it to the first subpart. + part = type(self)(policy=self.policy) + part._headers = part_headers + part._payload = self._payload + self._payload = [part] + else: + self._payload = [] + self._headers = keep_headers + self['Content-Type'] = 'multipart/' + subtype + if boundary is not None: + self.set_param('boundary', boundary) + + def make_related(self, boundary=None): + self._make_multipart('related', ('alternative', 'mixed'), boundary) + + def make_alternative(self, boundary=None): + self._make_multipart('alternative', ('mixed',), boundary) + + def make_mixed(self, boundary=None): + self._make_multipart('mixed', (), boundary) + + def _add_multipart(self, _subtype, *args, _disp=None, **kw): + if (self.get_content_maintype() != 'multipart' or + self.get_content_subtype() != _subtype): + getattr(self, 'make_' + _subtype)() + part = type(self)(policy=self.policy) + part.set_content(*args, **kw) + if _disp and 'content-disposition' not in part: + part['Content-Disposition'] = _disp + self.attach(part) + + def add_related(self, *args, **kw): + self._add_multipart('related', *args, _disp='inline', **kw) + + def add_alternative(self, *args, **kw): + self._add_multipart('alternative', *args, **kw) + + def add_attachment(self, *args, **kw): + self._add_multipart('mixed', *args, _disp='attachment', **kw) + + def clear(self): + self._headers = [] + self._payload = None + + def clear_content(self): + self._headers = [(n, v) for n, v in self._headers + if not n.lower().startswith('content-')] + self._payload = None + + +class EmailMessage(MIMEPart): + + def set_content(self, *args, **kw): + super().set_content(*args, **kw) + if 'MIME-Version' not in self: + self['MIME-Version'] = '1.0' diff --git a/Lib/email/policy.py b/Lib/email/policy.py index 38e88af..f0b20f4 100644 --- a/Lib/email/policy.py +++ b/Lib/email/policy.py @@ -5,6 +5,7 @@ code that adds all the email6 features. from email._policybase import Policy, Compat32, compat32, _extend_docstrings from email.utils import _has_surrogates from email.headerregistry import HeaderRegistry as HeaderRegistry +from email.contentmanager import raw_data_manager __all__ = [ 'Compat32', @@ -58,10 +59,22 @@ class EmailPolicy(Policy): special treatment, while all other fields are treated as unstructured. This list will be completed before the extension is marked stable.) + + content_manager -- an object with at least two methods: get_content + and set_content. When the get_content or + set_content method of a Message object is called, + it calls the corresponding method of this object, + passing it the message object as its first argument, + and any arguments or keywords that were passed to + it as additional arguments. The default + content_manager is + :data:`~email.contentmanager.raw_data_manager`. + """ refold_source = 'long' header_factory = HeaderRegistry() + content_manager = raw_data_manager def __init__(self, **kw): # Ensure that each new instance gets a unique header factory diff --git a/Lib/email/utils.py b/Lib/email/utils.py index b3b42bb..25b0d56 100644 --- a/Lib/email/utils.py +++ b/Lib/email/utils.py @@ -68,9 +68,13 @@ def _has_surrogates(s): # How to deal with a string containing bytes before handing it to the # application through the 'normal' interface. def _sanitize(string): - # Turn any escaped bytes into unicode 'unknown' char. - original_bytes = string.encode('ascii', 'surrogateescape') - return original_bytes.decode('ascii', 'replace') + # Turn any escaped bytes into unicode 'unknown' char. If the escaped + # bytes happen to be utf-8 they will instead get decoded, even if they + # were invalid in the charset the source was supposed to be in. This + # seems like it is not a bad thing; a defect was still registered. + original_bytes = string.encode('utf-8', 'surrogateescape') + return original_bytes.decode('utf-8', 'replace') + # Helpers diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index f206ace..d0a43e0 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -2,6 +2,7 @@ import os import sys import unittest import test.support +import collections import email from email.message import Message from email._policybase import compat32 @@ -42,6 +43,8 @@ class TestEmailBase(unittest.TestCase): # here we make minimal changes in the test_email tests compared to their # pre-3.3 state. policy = compat32 + # Likewise, the default message object is Message. + message = Message def __init__(self, *args, **kw): super().__init__(*args, **kw) @@ -54,11 +57,23 @@ class TestEmailBase(unittest.TestCase): with openfile(filename) as fp: return email.message_from_file(fp, policy=self.policy) - def _str_msg(self, string, message=Message, policy=None): + def _str_msg(self, string, message=None, policy=None): if policy is None: policy = self.policy + if message is None: + message = self.message return email.message_from_string(string, message, policy=policy) + def _bytes_msg(self, bytestring, message=None, policy=None): + if policy is None: + policy = self.policy + if message is None: + message = self.message + return email.message_from_bytes(bytestring, message, policy=policy) + + def _make_message(self): + return self.message(policy=self.policy) + def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] @@ -123,6 +138,7 @@ def parameterize(cls): """ paramdicts = {} + testers = collections.defaultdict(list) for name, attr in cls.__dict__.items(): if name.endswith('_params'): if not hasattr(attr, 'keys'): @@ -134,9 +150,17 @@ def parameterize(cls): d[n] = x attr = d paramdicts[name[:-7] + '_as_'] = attr + if '_as_' in name: + testers[name.split('_as_')[0] + '_as_'].append(name) testfuncs = {} + for name in paramdicts: + if name not in testers: + raise ValueError("No tester found for {}".format(name)) + for name in testers: + if name not in paramdicts: + raise ValueError("No params found for {}".format(name)) for name, attr in cls.__dict__.items(): - for paramsname, paramsdict in paramdicts.items(): + for paramsname, paramsdict in list(paramdicts.items()): if name.startswith(paramsname): testnameroot = 'test_' + name[len(paramsname):] for paramname, params in paramsdict.items(): diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py new file mode 100644 index 0000000..eac5028 --- /dev/null +++ b/Lib/test/test_email/test_contentmanager.py @@ -0,0 +1,793 @@ +import unittest +from test.test_email import TestEmailBase, parameterize +import textwrap +from email import policy +from email.message import EmailMessage +from email.contentmanager import ContentManager, raw_data_manager + + +@parameterize +class TestContentManager(TestEmailBase): + + policy = policy.default + message = EmailMessage + + get_key_params = { + 'full_type': (1, 'text/plain',), + 'maintype_only': (2, 'text',), + 'null_key': (3, '',), + } + + def get_key_as_get_content_key(self, order, key): + def foo_getter(msg, foo=None): + bar = msg['X-Bar-Header'] + return foo, bar + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'foo' + self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) + + def get_key_as_get_content_key_order(self, order, key): + def bar_getter(msg): + return msg['X-Bar-Header'] + def foo_getter(msg): + return msg['X-Foo-Header'] + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_get_handler(key, bar_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'bar' + m['X-Foo-Header'] = 'foo' + self.assertEqual(cm.get_content(m), ('foo')) + + def test_get_content_raises_if_unknown_mimetype_and_no_default(self): + cm = ContentManager() + m = self._make_message() + m['Content-Type'] = 'text/plain' + with self.assertRaisesRegex(KeyError, 'text/plain'): + cm.get_content(m) + + class BaseThing(str): + pass + baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' + class Thing(BaseThing): + pass + testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' + + set_key_params = { + 'type': (0, Thing,), + 'full_path': (1, testobject_full_path,), + 'qualname': (2, 'TestContentManager.Thing',), + 'name': (3, 'Thing',), + 'base_type': (4, BaseThing,), + 'base_full_path': (5, baseobject_full_path,), + 'base_qualname': (6, 'TestContentManager.BaseThing',), + 'base_name': (7, 'BaseThing',), + 'str_type': (8, str,), + 'str_full_path': (9, 'builtins.str',), + 'str_name': (10, 'str',), # str name and qualname are the same + 'null_key': (11, None,), + } + + def set_key_as_set_content_key(self, order, key): + def foo_setter(msg, obj, foo=None): + msg['X-Foo-Header'] = foo + msg.set_payload(obj) + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj, foo='bar') + self.assertEqual(m['X-Foo-Header'], 'bar') + self.assertEqual(m.get_payload(), msg_obj) + + def set_key_as_set_content_key_order(self, order, key): + def foo_setter(msg, obj): + msg['X-FooBar-Header'] = 'foo' + msg.set_payload(obj) + def bar_setter(msg, obj): + msg['X-FooBar-Header'] = 'bar' + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_set_handler(key, bar_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj) + self.assertEqual(m['X-FooBar-Header'], 'foo') + self.assertEqual(m.get_payload(), msg_obj) + + def test_set_content_raises_if_unknown_type_and_no_default(self): + cm = ContentManager() + m = self._make_message() + msg_obj = self.Thing() + with self.assertRaisesRegex(KeyError, self.testobject_full_path): + cm.set_content(m, msg_obj) + + def test_set_content_raises_if_called_on_multipart(self): + cm = ContentManager() + m = self._make_message() + m['Content-Type'] = 'multipart/foo' + with self.assertRaises(TypeError): + cm.set_content(m, 'test') + + def test_set_content_calls_clear_content(self): + m = self._make_message() + m['Content-Foo'] = 'bar' + m['Content-Type'] = 'text/html' + m['To'] = 'test' + m.set_payload('abc') + cm = ContentManager() + cm.add_set_handler(str, lambda *args, **kw: None) + m.set_content('xyz', content_manager=cm) + self.assertIsNone(m['Content-Foo']) + self.assertIsNone(m['Content-Type']) + self.assertEqual(m['To'], 'test') + self.assertIsNone(m.get_payload()) + + +@parameterize +class TestRawDataManager(TestEmailBase): + # Note: these tests are dependent on the order in which headers are added + # to the message objects by the code. There's no defined ordering in + # RFC5322/MIME, so this makes the tests more fragile than the standards + # require. However, if the header order changes it is best to understand + # *why*, and make sure it isn't a subtle bug in whatever change was + # applied. + + policy = policy.default.clone(max_line_length=60, + content_manager=raw_data_manager) + message = EmailMessage + + def test_get_text_plain(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") + + def test_get_text_html(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/html + +

Basic text.

+ """)) + self.assertEqual(raw_data_manager.get_content(m), "

Basic text.

\n") + + def test_get_text_plain_latin1(self): + m = self._bytes_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset=latin1 + + Basìc tëxt. + """).encode('latin1')) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_latin1_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: quoted-printable + + Bas=ECc t=EBxt. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo= + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_bad_utf8_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") + + def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo\xFF= + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, foo='ignore') + + def test_get_non_text(self): + template = textwrap.dedent("""\ + Content-Type: {} + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """) + for maintype in 'audio image video application'.split(): + with self.subTest(maintype=maintype): + m = self._str_msg(template.format(maintype+'/foo')) + self.assertEqual(raw_data_manager.get_content(m), b"bogus data") + + def test_get_non_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: image/jpg + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, errors='ignore') + + def test_get_raises_on_multipart(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="===" + + --=== + --===-- + """)) + with self.assertRaises(KeyError): + raw_data_manager.get_content(m) + + def test_get_message_rfc822_and_external_body(self): + template = textwrap.dedent("""\ + Content-Type: message/{} + + To: foo@example.com + From: bar@example.com + Subject: example + + an example message + """) + for subtype in 'rfc822 external-body'.split(): + with self.subTest(subtype=subtype): + m = self._str_msg(template.format(subtype)) + sub_msg = raw_data_manager.get_content(m) + self.assertIsInstance(sub_msg, self.message) + self.assertEqual(raw_data_manager.get_content(sub_msg), + "an example message\n") + self.assertEqual(sub_msg['to'], 'foo@example.com') + self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') + + def test_get_message_non_rfc822_or_external_body_yields_bytes(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: message/partial + + To: foo@example.com + From: bar@example.com + Subject: example + + The real body is in another message. + """)) + self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') + + def test_set_text_plain(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_html(self): + m = self._make_message() + content = "

Simple message.

\n" + raw_data_manager.set_content(m, content, subtype='html') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/html; charset="utf-8" + Content-Transfer-Encoding: 7bit + +

Simple message.

+ """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_charset_latin_1(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, charset='latin-1') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_short_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = "et là il est monté sur moi et il commence à m'éto.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + et là il est monté sur moi et il commence à m'éto. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = ("j'ai un problème de python. il est sorti de son" + " vivarium. et là il est monté sur moi et il commence" + " à m'éto.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = + =C3=A0 m'=C3=A9to. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + ( + "j'ai un problème de python. il est sorti de son" + " vivarium. et là il est monté sur moi et il commence" + " à m'éto.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = + =C3=A0 m'=C3=A9to. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + """ + '\n'*10 + """ + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD + tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo + xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD + qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg + w6TDqcOoxJnDtsWRLgo= + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): + # Yes, it chooses "wrong" here. It's a heuristic. So this result + # could change if we come up with a better heuristic. + m = self._make_message() + content = ('\n'*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, "\n"*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= + =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= + =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= + =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= + =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= + =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= + =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= + =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= + =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= + =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= + =C5=91. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_non_ascii_with_cte_7bit_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') + + def test_set_text_non_ascii_with_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') + + def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') + + def test_set_message(self): + m = self._make_message() + m['Subject'] = "Forwarded message" + content = self._make_message() + content['To'] = 'python@vivarium.org' + content['From'] = 'police@monty.org' + content['Subject'] = "get back in your box" + content.set_content("Or face the comfy chair.") + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Subject: Forwarded message + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: python@vivarium.org + From: police@monty.org + Subject: get back in your box + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + MIME-Version: 1.0 + + Or face the comfy chair. + """)) + payload = m.get_payload(0) + self.assertIsInstance(payload, self.message) + self.assertEqual(str(payload), str(content)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_with_non_ascii_and_coercion_to_7bit(self): + m = self._make_message() + m['Subject'] = "Escape report" + content = self._make_message() + content['To'] = 'police@monty.org' + content['From'] = 'victim@monty.org' + content['Subject'] = "Help" + content.set_content("j'ai un problème de python. il est sorti de son" + " vivarium.") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Subject: Escape report + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + MIME-Version: 1.0 + + j'ai un problème de python. il est sorti de son vivarium. + """).encode('utf-8')) + # The choice of base64 for the body encoding is because generator + # doesn't bother with heuristics and uses it unconditionally for utf-8 + # text. + # XXX: the first cte should be 7bit, too...that's a generator bug. + # XXX: the line length in the body also looks like a generator bug. + self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), + textwrap.dedent("""\ + Subject: Escape report + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + Content-Type: text/plain; charset="utf-8" + MIME-Version: 1.0 + Content-Transfer-Encoding: base64 + + aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt + Lgo= + """)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_invalid_cte_raises(self): + m = self._make_message() + content = self._make_message() + for cte in 'quoted-printable base64'.split(): + for subtype in 'rfc822 external-body'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + subtype = 'external-body' + for cte in '8bit binary'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + + def test_set_image_jpg(self): + for content in (b"bogus content", + bytearray(b"bogus content"), + memoryview(b"bogus content")): + with self.subTest(content=content): + m = self._make_message() + raw_data_manager.set_content(m, content, 'image', 'jpeg') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: image/jpeg + Content-Transfer-Encoding: base64 + + Ym9ndXMgY29udGVudA== + """)) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_audio_aif_with_quoted_printable_cte(self): + # Why you would use qp, I don't know, but it is technically supported. + # XXX: the incorrect line length is because binascii.b2a_qp doesn't + # support a line length parameter, but we must use it to get newline + # encoding. + # XXX: what about that lack of tailing newline? Do we actually handle + # that correctly in all cases? That is, if the *source* has an + # unencoded newline, do we add an extra newline to the returned payload + # or not? And can that actually be disambiguated based on the RFC? + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'audio', 'aif', cte='quoted-printable') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: audio/aif + Content-Transfer-Encoding: quoted-printable + MIME-Version: 1.0 + + b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= + zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_video_mpeg_with_binary_cte(self): + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'video', 'mpeg', cte='binary') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: video/mpeg + Content-Transfer-Encoding: binary + MIME-Version: 1.0 + + """).encode('ascii') + + # XXX: the second \n ought to be a \r, but generator gets it wrong. + # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. + b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_application_octet_stream_with_8bit_cte(self): + # In 8bit mode, univeral line end logic applies. It is up to the + # application to make sure the lines are short enough; we don't check. + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' + m.set_content(content, 'application', 'octet-stream', cte='8bit') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: application/octet-stream + Content-Transfer-Encoding: 8bit + MIME-Version: 1.0 + + """).encode('ascii') + + b'b\xFFgus\tcon\nt\nent\n' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_headers_from_header_objects(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + raw_data_manager.set_content(m, content, headers=( + header_factory("To", "foo@example.com"), + header_factory("From", "foo@example.com"), + header_factory("Subject", "I'm talking to myself."))) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + To: foo@example.com + From: foo@example.com + Subject: I'm talking to myself. + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_from_strings(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, headers=( + "X-Foo-Header: foo", + "X-Bar-Header: bar",)) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + X-Foo-Header: foo + X-Bar-Header: bar + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_with_invalid_duplicate_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + "Content-Type: foo/bar",) + ) + + def test_set_headers_with_invalid_duplicate_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + header_factory("Content-Type", " foo/bar"),) + ) + + def test_set_headers_with_defective_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + 'To: a@fairly@@invalid@address',) + ) + print(m['To'].defects) + + def test_set_headers_with_defective_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + header_factory('To', 'a@fairly@@invalid@address'),) + ) + print(m['To'].defects) + + def test_set_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def test_set_disposition_attachment(self): + m = self._make_message() + m.set_content('foo', disposition='attachment') + self.assertEqual(m['Content-Disposition'], 'attachment') + + def test_set_disposition_foo(self): + m = self._make_message() + m.set_content('foo', disposition='foo') + self.assertEqual(m['Content-Disposition'], 'foo') + + # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that + # would cause 'foo' above to raise. + + def test_set_filename(self): + m = self._make_message() + m.set_content('foo', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'attachment; filename="bar.txt"') + + def test_set_filename_and_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') + + def test_set_non_ascii_filename(self): + m = self._make_message() + m.set_content('foo', filename='ábárî.txt') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + Content-Disposition: attachment; + filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt + MIME-Version: 1.0 + + foo + """).encode('ascii')) + + content_object_params = { + 'text_plain': ('content', ()), + 'text_html': ('content', ('html',)), + 'application_octet_stream': (b'content', ('application', 'octet_stream')), + 'image_jpeg': (b'content', ('image', 'jpeg')), + 'message_rfc822': (message(), ()), + 'message_external_body': (message(), ('external-body',)), + } + + def content_object_as_header_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, headers=( + 'To: foo@example.com', + 'From: bar@simple.net')) + self.assertEqual(m['to'], 'foo@example.com') + self.assertEqual(m['from'], 'bar@simple.net') + + def content_object_as_disposition_inline_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') + self.assertEqual(m.get_filename(), "bár.txt") + self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") + + def content_object_as_cid_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, cid='some_random_stuff') + self.assertEqual(m['Content-ID'], 'some_random_stuff') + + def content_object_as_params_receiver(self, obj, mimetype): + m = self._make_message() + params = {'foo': 'bár', 'abc': 'xyz'} + m.set_content(obj, *mimetype, params=params) + if isinstance(obj, str): + params['charset'] = 'utf-8' + self.assertEqual(m['Content-Type'].params, params) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py index f829f83..f2df372 100644 --- a/Lib/test/test_email/test_headerregistry.py +++ b/Lib/test/test_email/test_headerregistry.py @@ -661,7 +661,7 @@ class TestContentTypeHeader(TestHeaderBase): 'text/plain; name="ascii_is_the_default"'), 'rfc2231_bad_character_in_charset_parameter_value': ( - "text/plain; charset*=ascii''utf-8%E2%80%9D", + "text/plain; charset*=ascii''utf-8%F1%F2%F3", 'text/plain', 'text', 'plain', @@ -669,6 +669,18 @@ class TestContentTypeHeader(TestHeaderBase): [errors.UndecodableBytesDefect], 'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'), + 'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': ( + "text/plain; charset*=ascii''utf-8%E2%80%9D", + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8”'}, + [errors.UndecodableBytesDefect], + 'text/plain; charset="utf-8”"', + ), + # XXX: if the above were *re*folded, it would get tagged as utf-8 + # instead of ascii in the param, since it now contains non-ASCII. + 'rfc2231_encoded_then_unencoded_segments': ( ('application/x-foo;' '\tname*0*="us-ascii\'en-us\'My";' diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py index 8cc3f80..c0440f1 100644 --- a/Lib/test/test_email/test_message.py +++ b/Lib/test/test_email/test_message.py @@ -1,6 +1,13 @@ import unittest +import textwrap from email import policy -from test.test_email import TestEmailBase +from email.message import EmailMessage, MIMEPart +from test.test_email import TestEmailBase, parameterize + + +# Helper. +def first(iterable): + return next(filter(lambda x: x is not None, iterable), None) class Test(TestEmailBase): @@ -14,5 +21,725 @@ class Test(TestEmailBase): m['To'] = 'xyz@abc' +@parameterize +class TestEmailMessageBase: + + policy = policy.default + + # The first argument is a triple (related, html, plain) of indices into the + # list returned by 'walk' called on a Message constructed from the third. + # The indices indicate which part should match the corresponding part-type + # when passed to get_body (ie: the "first" part of that type in the + # message). The second argument is a list of indices into the 'walk' list + # of the attachments that should be returned by a call to + # 'iter_attachments'. The third argument is a list of indices into 'walk' + # that should be returned by a call to 'iter_parts'. Note that the first + # item returned by 'walk' is the Message itself. + + message_params = { + + 'empty_message': ( + (None, None, 0), + (), + (), + ""), + + 'non_mime_plain': ( + (None, None, 0), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + + simple text body + """)), + + 'mime_non_text': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: image/jpg + + bogus body. + """)), + + 'plain_html_alternative': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/alternative; boundary="===" + + preamble + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + +

simple body

+ --===-- + """)), + + 'plain_html_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + preamble + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + +

simple body

+ + --===-- + """)), + + 'plain_html_attachment_mixed': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + Content-Disposition: attachment + +

simple body

+ + --===-- + """)), + + 'html_text_attachment_mixed': ( + (None, 2, None), + (1,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + Content-Disposition: attachment + + simple body + + --=== + Content-Type: text/html + +

simple body

+ + --===-- + """)), + + 'html_text_attachment_inline_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + Content-Disposition: inline + + simple body + + --=== + Content-Type: text/html + Content-Disposition: inline + +

simple body

+ + --===-- + """)), + + # RFC 2387 + 'related': ( + (0, 1, None), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/related; boundary="==="; type=text/html + + --=== + Content-Type: text/html + +

simple body

+ + --=== + Content-Type: image/jpg + Content-ID: + + bogus data + + --===-- + """)), + + # This message structure will probably never be seen in the wild, but + # it proves we distinguish between text parts based on 'start'. The + # content would not, of course, actually work :) + 'related_with_start': ( + (0, 2, None), + (1,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/related; boundary="==="; type=text/html; + start="" + + --=== + Content-Type: text/html + Content-ID: + + useless text + + --=== + Content-Type: text/html + Content-ID: + +

simple body

+ + + --===-- + """)), + + + 'mixed_alternative_plain_related': ( + (3, 4, 2), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/alternative; boundary="+++" + + --+++ + Content-Type: text/plain + + simple body + + --+++ + Content-Type: multipart/related; boundary="___" + + --___ + Content-Type: text/html + +

simple body

+ + --___ + Content-Type: image/jpg + Content-ID: + + bogus jpg body + + --___-- + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + # This structure suggested by Stephen J. Turnbull...may not exist/be + # supported in the wild, but we want to support it. + 'mixed_related_alternative_plain_html': ( + (1, 4, 3), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/related; boundary="+++" + + --+++ + Content-Type: multipart/alternative; boundary="___" + + --___ + Content-Type: text/plain + + simple body + + --___ + Content-Type: text/html + +

simple body

+ + --___-- + + --+++ + Content-Type: image/jpg + Content-ID: + + bogus jpg body + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + # Same thing, but proving we only look at the root part, which is the + # first one if there isn't any start parameter. That is, this is a + # broken related. + 'mixed_related_alternative_plain_html_wrong_order': ( + (1, None, None), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/related; boundary="+++" + + --+++ + Content-Type: image/jpg + Content-ID: + + bogus jpg body + + --+++ + Content-Type: multipart/alternative; boundary="___" + + --___ + Content-Type: text/plain + + simple body + + --___ + Content-Type: text/html + +

simple body

+ + --___-- + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + 'message_rfc822': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + """)), + + 'mixed_text_message_rfc822': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + + Your message has bounced, ser. + + --=== + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + + --===-- + """)), + + } + + def message_as_get_body(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + expected = [None if n is None else allparts[n] for n in body_parts] + related = 0; html = 1; plain = 2 + self.assertEqual(m.get_body(), first(expected)) + self.assertEqual(m.get_body(preferencelist=('related', 'html', 'plain')), + first(expected)) + self.assertEqual(m.get_body(preferencelist=('related', 'html')), + first(expected[related:html+1])) + self.assertEqual(m.get_body(preferencelist=('related', 'plain')), + first([expected[related], expected[plain]])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain')), + first(expected[html:plain+1])) + self.assertEqual(m.get_body(preferencelist=['related']), expected[related]) + self.assertEqual(m.get_body(preferencelist=['html']), expected[html]) + self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain]) + self.assertEqual(m.get_body(preferencelist=('plain', 'html')), + first(expected[plain:html-1:-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'related')), + first([expected[plain], expected[related]])) + self.assertEqual(m.get_body(preferencelist=('html', 'related')), + first(expected[html::-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')), + first(expected[::-1])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')), + first([expected[html], + expected[plain], + expected[related]])) + + def message_as_iter_attachment(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + attachments = [allparts[n] for n in attachments] + self.assertEqual(list(m.iter_attachments()), attachments) + + def message_as_iter_parts(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + parts = [allparts[n] for n in parts] + self.assertEqual(list(m.iter_parts()), parts) + + class _TestContentManager: + def get_content(self, msg, *args, **kw): + return msg, args, kw + def set_content(self, msg, *args, **kw): + self.msg = msg + self.args = args + self.kw = kw + + def test_get_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertEqual(m.get_content(content_manager=cm), (m, (), {})) + msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_get_content_default_cm_comes_from_policy(self): + p = policy.default.clone(content_manager=self._TestContentManager()) + m = self._str_msg('', policy=p) + self.assertEqual(m.get_content(), (m, (), {})) + msg, args, kw = m.get_content('foo', bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_set_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + m.set_content(content_manager=cm) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + def test_set_content_default_cm_comes_from_policy(self): + cm = self._TestContentManager() + p = policy.default.clone(content_manager=cm) + m = self._str_msg('', policy=p) + m.set_content() + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + # outcome is whether xxx_method should raise ValueError error when called + # on multipart/subtype. Blank outcome means it depends on xxx (add + # succeeds, make raises). Note: 'none' means there are content-type + # headers but payload is None...this happening in practice would be very + # unusual, so treating it as if there were content seems reasonable. + # method subtype outcome + subtype_params = ( + ('related', 'no_content', 'succeeds'), + ('related', 'none', 'succeeds'), + ('related', 'plain', 'succeeds'), + ('related', 'related', ''), + ('related', 'alternative', 'raises'), + ('related', 'mixed', 'raises'), + ('alternative', 'no_content', 'succeeds'), + ('alternative', 'none', 'succeeds'), + ('alternative', 'plain', 'succeeds'), + ('alternative', 'related', 'succeeds'), + ('alternative', 'alternative', ''), + ('alternative', 'mixed', 'raises'), + ('mixed', 'no_content', 'succeeds'), + ('mixed', 'none', 'succeeds'), + ('mixed', 'plain', 'succeeds'), + ('mixed', 'related', 'succeeds'), + ('mixed', 'alternative', 'succeeds'), + ('mixed', 'mixed', ''), + ) + + def _make_subtype_test_message(self, subtype): + m = self.message() + payload = None + msg_headers = [ + ('To', 'foo@bar.com'), + ('From', 'bar@foo.com'), + ] + if subtype != 'no_content': + ('content-shadow', 'Logrus'), + msg_headers.append(('X-Random-Header', 'Corwin')) + if subtype == 'text': + payload = '' + msg_headers.append(('Content-Type', 'text/plain')) + m.set_payload('') + elif subtype != 'no_content': + payload = [] + msg_headers.append(('Content-Type', 'multipart/' + subtype)) + msg_headers.append(('X-Trump', 'Random')) + m.set_payload(payload) + for name, value in msg_headers: + m[name] = value + return m, msg_headers, payload + + def _check_disallowed_subtype_raises(self, m, method_name, subtype, method): + with self.assertRaises(ValueError) as ar: + getattr(m, method)() + exc_text = str(ar.exception) + self.assertIn(subtype, exc_text) + self.assertIn(method_name, exc_text) + + def _check_make_multipart(self, m, msg_headers, payload): + count = 0 + for name, value in msg_headers: + if not name.lower().startswith('content-'): + self.assertEqual(m[name], value) + count += 1 + self.assertEqual(len(m), count+1) # +1 for new Content-Type + part = next(m.iter_parts()) + count = 0 + for name, value in msg_headers: + if name.lower().startswith('content-'): + self.assertEqual(part[name], value) + count += 1 + self.assertEqual(len(part), count) + self.assertEqual(part.get_payload(), payload) + + def subtype_as_make(self, method, subtype, outcome): + m, msg_headers, payload = self._make_subtype_test_message(subtype) + make_method = 'make_' + method + if outcome in ('', 'raises'): + self._check_disallowed_subtype_raises(m, method, subtype, make_method) + return + getattr(m, make_method)() + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(m.get_content_subtype(), method) + if subtype == 'no_content': + self.assertEqual(len(m.get_payload()), 0) + self.assertEqual(m.items(), + msg_headers + [('Content-Type', + 'multipart/'+method)]) + else: + self.assertEqual(len(m.get_payload()), 1) + self._check_make_multipart(m, msg_headers, payload) + + def subtype_as_make_with_boundary(self, method, subtype, outcome): + # Doing all variation is a bit of overkill... + m = self.message() + if outcome in ('', 'raises'): + m['Content-Type'] = 'multipart/' + subtype + with self.assertRaises(ValueError) as cm: + getattr(m, 'make_' + method)() + return + if subtype == 'plain': + m['Content-Type'] = 'text/plain' + elif subtype != 'no_content': + m['Content-Type'] = 'multipart/' + subtype + getattr(m, 'make_' + method)(boundary="abc") + self.assertTrue(m.is_multipart()) + self.assertEqual(m.get_boundary(), 'abc') + + def test_policy_on_part_made_by_make_comes_from_message(self): + for method in ('make_related', 'make_alternative', 'make_mixed'): + m = self.message(policy=self.policy.clone(content_manager='foo')) + m['Content-Type'] = 'text/plain' + getattr(m, method)() + self.assertEqual(m.get_payload(0).policy.content_manager, 'foo') + + class _TestSetContentManager: + def set_content(self, msg, content, *args, **kw): + msg['Content-Type'] = 'text/plain' + msg.set_payload(content) + + def subtype_as_add(self, method, subtype, outcome): + m, msg_headers, payload = self._make_subtype_test_message(subtype) + cm = self._TestSetContentManager() + add_method = 'add_attachment' if method=='mixed' else 'add_' + method + if outcome == 'raises': + self._check_disallowed_subtype_raises(m, method, subtype, add_method) + return + getattr(m, add_method)('test', content_manager=cm) + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(m.get_content_subtype(), method) + if method == subtype or subtype == 'no_content': + self.assertEqual(len(m.get_payload()), 1) + for name, value in msg_headers: + self.assertEqual(m[name], value) + part = m.get_payload()[0] + else: + self.assertEqual(len(m.get_payload()), 2) + self._check_make_multipart(m, msg_headers, payload) + part = m.get_payload()[1] + self.assertEqual(part.get_content_type(), 'text/plain') + self.assertEqual(part.get_payload(), 'test') + if method=='mixed': + self.assertEqual(part['Content-Disposition'], 'attachment') + elif method=='related': + self.assertEqual(part['Content-Disposition'], 'inline') + else: + # Otherwise we don't guess. + self.assertIsNone(part['Content-Disposition']) + + class _TestSetRaisingContentManager: + def set_content(self, msg, content, *args, **kw): + raise Exception('test') + + def test_default_content_manager_for_add_comes_from_policy(self): + cm = self._TestSetRaisingContentManager() + m = self.message(policy=self.policy.clone(content_manager=cm)) + for method in ('add_related', 'add_alternative', 'add_attachment'): + with self.assertRaises(Exception) as ar: + getattr(m, method)('') + self.assertEqual(str(ar.exception), 'test') + + def message_as_clear(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + m.clear() + self.assertEqual(len(m), 0) + self.assertEqual(list(m.items()), []) + self.assertIsNone(m.get_payload()) + self.assertEqual(list(m.iter_parts()), []) + + def message_as_clear_content(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + expected_headers = [h for h in m.keys() + if not h.lower().startswith('content-')] + m.clear_content() + self.assertEqual(list(m.keys()), expected_headers) + self.assertIsNone(m.get_payload()) + self.assertEqual(list(m.iter_parts()), []) + + +class TestEmailMessage(TestEmailMessageBase, TestEmailBase): + message = EmailMessage + + def test_set_content_adds_MIME_Version(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertEqual(m['MIME-Version'], '1.0') + + class _MIME_Version_adding_CM: + def set_content(self, msg, *args, **kw): + msg['MIME-Version'] = '1.0' + + def test_set_content_does_not_duplicate_MIME_Version(self): + m = self._str_msg('') + cm = self._MIME_Version_adding_CM() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertEqual(m['MIME-Version'], '1.0') + + +class TestMIMEPart(TestEmailMessageBase, TestEmailBase): + # Doing the full test run here may seem a bit redundant, since the two + # classes are almost identical. But what if they drift apart? So we do + # the full tests so that any future drift doesn't introduce bugs. + message = MIMEPart + + def test_set_content_does_not_add_MIME_Version(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertNotIn('MIME-Version', m) + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py index 983bd49..06ad5f2 100644 --- a/Lib/test/test_email/test_policy.py +++ b/Lib/test/test_email/test_policy.py @@ -30,6 +30,7 @@ class PolicyAPITests(unittest.TestCase): 'raise_on_defect': False, 'header_factory': email.policy.EmailPolicy.header_factory, 'refold_source': 'long', + 'content_manager': email.policy.EmailPolicy.content_manager, }) # For each policy under test, we give here what we expect the defaults to