Title: "email.message.Message.as_bytes": fails to correctly handle "charset"
Type: behavior Stage:
Components: email Versions: Python 3.9
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: barry, dmaurer, r.david.murray
Priority: normal Keywords:

Created on 2020-07-15 19:18 by dmaurer, last changed 2020-07-15 20:40 by dmaurer.

Messages (2)
msg373711 - (view) Author: Dieter Maurer (dmaurer) Date: 2020-07-15 19:18
In the transscript below, "ms" and "mb" should be equivalent:

>>> from email import message_from_string, message_from_bytes
>>> mt = """\
... Mime-Version: 1.0
... Content-Type: text/plain; charset=UTF-8
... Content-Transfer-Encoding: 8bit
... รค
... """
>>> ms = message_from_string(mt)
>>> mb = message_from_bytes(mt.encode("UTF-8"))

But "mb.as_bytes" succeeds while "ms.as_bytes" raises a "UnicodeEncodeError":

>>> mb.as_bytes()
b'Mime-Version: 1.0\nContent-Type: text/plain; charset=UTF-8\nContent-Transfer-Encoding: 8bit\n\n\xc3\xa4\n'
>>> ms.as_bytes()
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/email/", line 155, in _write_lines
  File "/usr/local/lib/python3.9/email/", line 406, in write
    self._fp.write(s.encode('ascii', 'surrogateescape'))
UnicodeEncodeError: 'ascii' codec can't encode character '\xe4' in position 0: ordinal not in range(128)

Apparently, the "as_bytes" ignores the "charset" parameter from the "Content-Type" header (it should use "utf-8", not "ascii" for the encoding).
msg373724 - (view) Author: Dieter Maurer (dmaurer) Date: 2020-07-15 20:40
The following fixes the example:
from copy import copy
from io import BytesIO
from email.message import Message
from email.generator import BytesGenerator, _has_surrogates
from email._policybase import Compat32

class FixedBytesGenerator(BytesGenerator):
    def _handle_text(self, msg):
        payload = msg._payload
        if payload is None:
        charset = msg.get_param("charset")
        if charset is not None \
               and not self.policy.cte_type=='7bit' \
               and not _has_surrogates(payload):
            msg = copy(msg)
            msg._payload = payload.encode(charset).decode(
                "ascii", "surrogateescape")
    _writeBody = _handle_text

class FixedMessage(Message):
    def as_bytes(self, unixfrom=False, policy=None):
        policy = self.policy if policy is None else policy
        fp = BytesIO()
        g = FixedBytesGenerator(fp, mangle_from_=False, policy=policy)
        g.flatten(self, unixfrom=unixfrom)
        return fp.getvalue()

fixed_policy = Compat32(message_factory=FixedMessage)

ms = message_from_string(mt, policy=fixed_policy)
Date User Action Args
2020-07-15 20:40:58dmaurersetmessages: + msg373724
2020-07-15 19:18:03dmaurercreate