import email.policy from email import errors from email._header_value_parser import Terminal, _fold_mime_parameters, _steal_trailing_WSP_if_exists, _fold_as_ew, \ BareQuotedString, ValueTerminal, quote_string from email.headerregistry import Address, UniqueAddressHeader, BaseHeader from email.message import EmailMessage from email.parser import BytesParser ####################################################################### # Setup: a policy that triggers folding of long headers ####################################################################### MAX_LINE_LEN = 72 GOOD_SMTP_POLICY = email.policy.default.clone(linesep='\r\n', max_line_length=MAX_LINE_LEN) ####################################################################### # Illustrate the problem at a high level: serialization of address # header and subsequent parsing do not achieve the same semantics. ####################################################################### display_name = r'anything@anything.com ' + 'a' * MAX_LINE_LEN addr_spec = 'dev@local.startmail.org' address = Address(display_name=display_name, addr_spec=addr_spec) message = EmailMessage(policy=GOOD_SMTP_POLICY) message['From'] = Address(display_name=display_name, addr_spec=addr_spec) # Trigger folding (via as_string()), then parse it back in. msg_string = message.as_string() msg_bytes = msg_string.encode('utf-8') msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes) # Verify badness. from_hdr = msg_deserialized['From'] assert from_hdr != str(address) assert len(from_hdr.addresses) == 1 assert from_hdr.addresses[0].display_name != display_name assert from_hdr.addresses[0].addr_spec != addr_spec assert from_hdr.addresses[0].addr_spec == 'anything@anything.com' # Definitely wrong. ####################################################################### # Illustrate the problem at a low level: folding of address produces # an unstructured header folding which does not respect mailbox # structure. ####################################################################### cls = UniqueAddressHeader _UniqueAddressHeader = type('_' + cls.__name__, (cls, BaseHeader), {}) hdr = _UniqueAddressHeader('From', address) assert len(hdr.addresses) == 1 assert hdr.addresses[0].display_name == display_name assert hdr.addresses[0].addr_spec == addr_spec bad_folded = hdr.fold(policy=GOOD_SMTP_POLICY) assert bad_folded == ('From: anything@anything.com\r\n' ' aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\r\n' ' \r\n') ######################################################################### # Fix the problem: respect use of quotes in _refold_parse_tree ######################################################################### def _refold_parse_tree(parse_tree, *, policy): """Return string of contents of parse_tree folded according to RFC rules. """ # max_line_length 0/None means no limit, ie: infinitely long. maxlen = policy.max_line_length or float("+inf") encoding = 'utf-8' if policy.utf8 else 'us-ascii' lines = [''] last_ew = None wrap_as_ew_blocked = 0 want_encoding = False end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked') parts = list(parse_tree) while parts: part = parts.pop(0) if part is end_ew_not_allowed: wrap_as_ew_blocked -= 1 continue tstr = str(part) try: tstr.encode(encoding) charset = encoding except UnicodeEncodeError: if any(isinstance(x, errors.UndecodableBytesDefect) for x in part.all_defects): charset = 'unknown-8bit' else: # If policy.utf8 is false this should really be taken from a # 'charset' property on the policy. charset = 'utf-8' want_encoding = True if part.token_type == 'mime-parameters': # Mime parameter folding (using RFC2231) is extra special. _fold_mime_parameters(part, lines, maxlen, encoding) continue if want_encoding and not wrap_as_ew_blocked: if not part.as_ew_allowed: want_encoding = False last_ew = None if part.syntactic_break: encoded_part = part.fold(policy=policy)[:-1] # strip nl if policy.linesep not in encoded_part: # It fits on a single line if len(encoded_part) > maxlen - len(lines[-1]): # But not on this one, so start a new one. newline = _steal_trailing_WSP_if_exists(lines) # XXX what if encoded_part has no leading FWS? lines.append(newline) lines[-1] += encoded_part continue # Either this is not a major syntactic break, so we don't # want it on a line by itself even if it fits, or it # doesn't fit on a line by itself. Either way, fall through # to unpacking the subparts and wrapping them. if not hasattr(part, 'encode'): # It's not a Terminal, do each piece individually. parts = list(part) + parts else: # It's a terminal, wrap it as an encoded word, possibly # combining it with previously encoded words if allowed. last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew, part.ew_combine_allowed, charset) want_encoding = False continue if len(tstr) <= maxlen - len(lines[-1]): lines[-1] += tstr continue # This part is too long to fit. The RFC wants us to break at # "major syntactic breaks", so unless we don't consider this # to be one, check if it will fit on the next line by itself. if (part.syntactic_break and len(tstr) + 1 <= maxlen): newline = _steal_trailing_WSP_if_exists(lines) if newline or part.startswith_fws(): lines.append(newline + tstr) continue # Begin modification # get_bare_quoted_string produces ValueTerminal for qcontent and WhiteSpace, or there were defects. if isinstance(part, BareQuotedString) and not part.defects: subparts = list(part) quoted_subparts = [] dquote = ValueTerminal('"', 'ptext') quoted_subparts.append(dquote) for subpart in subparts: if isinstance(subpart, ValueTerminal): quoted_without_quotes = quote_string(subpart)[1:-1] quoted_terminal = ValueTerminal(quoted_without_quotes, 'ptext') quoted_subparts.append(quoted_terminal) else: # Should be whitespace... quoted_subparts.append(subpart) if not part.as_ew_allowed: wrap_as_ew_blocked += 1 quoted_subparts.append(end_ew_not_allowed) quoted_subparts.append(dquote) parts = quoted_subparts + parts continue # End modification if not hasattr(part, 'encode'): # It's not a terminal, try folding the subparts. newparts = list(part) if not part.as_ew_allowed: wrap_as_ew_blocked += 1 newparts.append(end_ew_not_allowed) parts = newparts + parts continue if part.as_ew_allowed and not wrap_as_ew_blocked: # It doesn't need CTE encoding, but encode it anyway so we can # wrap it. parts.insert(0, part) want_encoding = True continue # We can't figure out how to wrap, it, so give up. newline = _steal_trailing_WSP_if_exists(lines) if newline or part.startswith_fws(): lines.append(newline + tstr) else: # We can't fold it onto the next line either... lines[-1] += tstr return policy.linesep.join(lines) + policy.linesep # Monkey patch. email._header_value_parser._refold_parse_tree = _refold_parse_tree ####################################################################### # Verify fix at low level: folding doesn't produce bad_folded. ####################################################################### cls = UniqueAddressHeader _UniqueAddressHeader = type('_' + cls.__name__, (cls, BaseHeader), {}) hdr = _UniqueAddressHeader('From', address) assert len(hdr.addresses) == 1 assert hdr.addresses[0].display_name == display_name assert hdr.addresses[0].addr_spec == addr_spec good_folded = hdr.fold(policy=GOOD_SMTP_POLICY) assert good_folded != bad_folded assert good_folded == ('From: "anything@anything.com\r\n' ' aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"\r\n' ' \r\n') ####################################################################### # Verify fix at high level: serialize, deserialize, assert equal. ####################################################################### message = EmailMessage(policy=GOOD_SMTP_POLICY) message['From'] = address # Fold with new refold, read it in again. msg_string = message.as_string() msg_bytes = msg_string.encode('utf-8') msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes) from_hdr = msg_deserialized['From'] assert from_hdr == str(address) assert len(from_hdr.addresses) == 1 assert from_hdr.addresses[0].display_name == display_name assert from_hdr.addresses[0].addr_spec == addr_spec