diff --git a/Doc/library/email.header.rst b/Doc/library/email.header.rst --- a/Doc/library/email.header.rst +++ b/Doc/library/email.header.rst @@ -109,9 +109,17 @@ Encode a message header into an RFC-compliant format, possibly wrapping long lines and encapsulating non-ASCII parts in base64 or quoted-printable - encodings. Optional *splitchars* is a string containing characters to - split long ASCII lines on, in rough support of :rfc:`2822`'s *highest - level syntactic breaks*. This doesn't affect :rfc:`2047` encoded lines. + encodings. + + Optional *splitchars* is a string containing characters which should be + given extra weight by the splitting algorithm during normal header + wrapping. This is in very rough support of :RFC:`2822`\'s 'higher level + syntactic breaks': split points preceded by a splitchar are preferred + during line splitting, with the characters preferred in the order in + which they appear in the string. Space and tab may be included in the + string to indicate whether preference should be given to one over the + other as a split point when other split chars do not appear in the line + being split. Splitchars does not affect RFC 2047 encoded lines. *maxlinelen*, if given, overrides the instance's value for the maximum line length. diff --git a/Lib/email/header.py b/Lib/email/header.py --- a/Lib/email/header.py +++ b/Lib/email/header.py @@ -26,6 +26,7 @@ SPACE8 = ' ' * 8 EMPTYSTRING = '' MAXLINELEN = 78 +FWS = ' \t' USASCII = Charset('us-ascii') UTF8 = Charset('utf-8') @@ -299,9 +300,15 @@ If the given charset is not known or an error occurs during conversion, this function will return the header untouched. - Optional splitchars is a string containing characters to split long - ASCII lines on, in rough support of RFC 2822's `highest level - syntactic breaks'. This doesn't affect RFC 2047 encoded lines. + Optional splitchars is a string containing characters which should be + given extra weight by the splitting algorithm during normal header + wrapping. This is in very rough support of RFC 2822's `higher level + syntactic breaks': split points preceded by a splitchar are preferred + during line splitting, with the characters preferred in the order in + which they appear in the string. Space and tab may be included in the + string to indicate whether preference should be given to one over the + other as a split point when other split chars do not appear in the line + being split. Splitchars does not affect RFC 2047 encoded lines. Optional linesep is a string to be used to separate the lines of the value. The default value is the most useful for typical @@ -320,13 +327,19 @@ self._continuation_ws, splitchars) for string, charset in self._chunks: lines = string.splitlines() - formatter.feed(lines[0] if lines else '', charset) + if lines: + formatter.feed('', lines[0], charset) + else: + formatter.feed('', '', charset) for line in lines[1:]: formatter.newline() if charset.header_encoding is not None: - formatter.feed(self._continuation_ws, USASCII) - line = ' ' + line.lstrip() - formatter.feed(line, charset) + formatter.feed(self._continuation_ws, ' ' + line.lstrip(), + charset) + else: + sline = line.lstrip() + fws = line[:len(line)-len(sline)] + formatter.feed(fws, sline, charset) if len(lines) > 1: formatter.newline() formatter.add_transition() @@ -360,7 +373,7 @@ def __init__(self, headerlen, maxlen, continuation_ws, splitchars): self._maxlen = maxlen self._continuation_ws = continuation_ws - self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8)) + self._continuation_ws_len = len(continuation_ws) self._splitchars = splitchars self._lines = [] self._current_line = _Accumulator(headerlen) @@ -374,43 +387,26 @@ def newline(self): end_of_line = self._current_line.pop() - if end_of_line is not None: - self._current_line.push(end_of_line) + if end_of_line != (' ', ''): + self._current_line.push(*end_of_line) if len(self._current_line) > 0: - self._lines.append(str(self._current_line)) + if self._current_line.is_onlyws(): + self._lines[-1] += str(self._current_line) + else: + self._lines.append(str(self._current_line)) self._current_line.reset() def add_transition(self): - self._current_line.push(None) + self._current_line.push(' ', '') - def feed(self, string, charset): - # If the string itself fits on the current line in its encoded format, - # then add it now and be done with it. - encoded_string = charset.header_encode(string) - if len(encoded_string) + len(self._current_line) <= self._maxlen: - self._current_line.push(encoded_string) - return + def feed(self, fws, string, charset): # If the charset has no header encoding (i.e. it is an ASCII encoding) # then we must split the header at the "highest level syntactic break" # possible. Note that we don't have a lot of smarts about field # syntax; we just try to break on semi-colons, then commas, then # whitespace. Eventually, this should be pluggable. if charset.header_encoding is None: - for ch in self._splitchars: - if ch in string: - break - else: - ch = None - # If there's no available split character then regardless of - # whether the string fits on the line, we have to put it on a line - # by itself. - if ch is None: - if not self._current_line.is_onlyws(): - self._lines.append(str(self._current_line)) - self._current_line.reset(self._continuation_ws) - self._current_line.push(encoded_string) - else: - self._ascii_split(string, ch) + self._ascii_split(fws, string, self._splitchars) return # Otherwise, we're doing either a Base64 or a quoted-printable # encoding which means we don't need to split the line on syntactic @@ -428,15 +424,14 @@ # There are no encoded lines, so we're done. return if first_line is not None: - self._current_line.push(first_line) - self._lines.append(str(self._current_line)) - self._current_line.reset(self._continuation_ws) + self._append_chunk(fws, first_line) try: last_line = encoded_lines.pop() except IndexError: # There was only one line. return - self._current_line.push(last_line) + self.newline() + self._current_line.push(self._continuation_ws, last_line) # Everything else are full lines in themselves. for line in encoded_lines: self._lines.append(self._continuation_ws + line) @@ -447,162 +442,96 @@ while True: yield self._maxlen - self._continuation_ws_len - def _ascii_split(self, string, ch): - holding = _Accumulator() - # Split the line on the split character, preserving it. If the split - # character is whitespace RFC 2822 $2.2.3 requires us to fold on the - # whitespace, so that the line leads with the original whitespace we - # split on. However, if a higher syntactic break is used instead - # (e.g. comma or semicolon), the folding should happen after the split - # character. But then in that case, we need to add our own - # continuation whitespace -- although won't that break unfolding? - for part, splitpart, nextpart in _spliterator(ch, string): - if not splitpart: - # No splitpart means this is the last chunk. Put this part - # either on the current line or the next line depending on - # whether it fits. - holding.push(part) - if len(holding) + len(self._current_line) <= self._maxlen: - # It fits, but we're done. - self._current_line.push(str(holding)) + def _ascii_split(self, fws, string, splitchars): + # The RFC 2822 header folding algorithm is simple in principle but + # complex in practice. Lines may be folded any place where "folding + # white space" appears by inserting a linesep character in front of the + # FWS. The complication is that not all spaces or tabs qualify as FWS, + # and we are also supposed to prefer to break at "higher level + # syntactic breaks". We can't do either of these without intimate + # knowledge of the structure of structured headers, which we don't have + # here. So the best we can do here is prefer to break at the specified + # splitchars, and hope that we don't choose any spaces or tabs that + # aren't legal FWS. (This is at least better than the old algorithm, + # where we would sometimes *introduce* FWS after a splitchar, or the + # algorithm before that, where we would turn all white space runs into + # single spaces or tabs.) + parts = re.split("(["+FWS+"]+)", fws+string) + if parts[0]: + parts[:0] = [''] + else: + parts.pop(0) + for fws, part in zip(*[iter(parts)]*2): + self._append_chunk(fws, part) + + def _append_chunk(self, fws, string): + self._current_line.push(fws, string) + if len(self._current_line) > self._maxlen: + # Find the best split point, working backward from the end. + # There might be none, on a long first line. + for ch in self._splitchars: + for i in range(self._current_line.part_count()-1, 0, -1): + if ch.isspace(): + fws = self._current_line[i][0] + if fws and fws[0]==ch: + break + prevpart = self._current_line[i-1][1] + if prevpart and prevpart[-1]==ch: + break else: - # It doesn't fit, but we're done. Before pushing a new - # line, watch out for the current line containing only - # whitespace. - holding.pop() - if self._current_line.is_onlyws() and holding.is_onlyws(): - # Don't start a new line. - holding.push(part) - part = None - self._current_line.push(str(holding)) - self._lines.append(str(self._current_line)) - if part is None: - self._current_line.reset() - else: - holding.reset(part) - self._current_line.reset(str(holding)) + continue + break + else: + fws, part = self._current_line.pop() + if self._current_line._initial_size > 0: + # There will be a header, so leave it on a line by itself. + self.newline() + if not fws: + # We don't use continuation_ws here because the whitespace + # after a header should always be a space. + fws = ' ' + self._current_line.push(fws, part) return - elif not nextpart: - # There must be some trailing or duplicated split characters - # because we - # found a split character but no next part. In this case we - # must treat the thing to fit as the part + splitpart because - # if splitpart is whitespace it's not allowed to be the only - # thing on the line, and if it's not whitespace we must split - # after the syntactic break. - holding_prelen = len(holding) - holding.push(part + splitpart) - if len(holding) + len(self._current_line) <= self._maxlen: - self._current_line.push(str(holding)) - elif holding_prelen == 0: - # This is the only chunk left so it has to go on the - # current line. - self._current_line.push(str(holding)) - else: - save_part = holding.pop() - self._current_line.push(str(holding)) - self._lines.append(str(self._current_line)) - holding.reset(save_part) - self._current_line.reset(str(holding)) - holding.reset() - elif not part: - # We're leading with a split character. See if the splitpart - # and nextpart fits on the current line. - holding.push(splitpart + nextpart) - holding_len = len(holding) - # We know we're not leaving the nextpart on the stack. - holding.pop() - if holding_len + len(self._current_line) <= self._maxlen: - holding.push(splitpart) - else: - # It doesn't fit. Since there's no current part really - # the best we can do is start a new line and push the - # split part onto it. - self._current_line.push(str(holding)) - holding.reset() - if len(self._current_line) > 0 and self._lines: - self._lines.append(str(self._current_line)) - self._current_line.reset() - holding.push(splitpart) - else: - # All three parts are present. First let's see if all three - # parts will fit on the current line. If so, we don't need to - # split it. - holding.push(part + splitpart + nextpart) - holding_len = len(holding) - # Pop the part because we'll push nextpart on the next - # iteration through the loop. - holding.pop() - if holding_len + len(self._current_line) <= self._maxlen: - holding.push(part + splitpart) - else: - # The entire thing doesn't fit. See if we need to split - # before or after the split characters. - if splitpart.isspace(): - # Split before whitespace. Remember that the - # whitespace becomes the continuation whitespace of - # the next line so it goes to current_line not holding. - holding.push(part) - self._current_line.push(str(holding)) - holding.reset() - self._lines.append(str(self._current_line)) - self._current_line.reset(splitpart) - else: - # Split after non-whitespace. The continuation - # whitespace comes from the instance variable. - holding.push(part + splitpart) - self._current_line.push(str(holding)) - holding.reset() - self._lines.append(str(self._current_line)) - if nextpart[0].isspace(): - self._current_line.reset() - else: - self._current_line.reset(self._continuation_ws) - # Get the last of the holding part - self._current_line.push(str(holding)) + remainder = self._current_line.pop_from(i) + self._lines.append(str(self._current_line)) + self._current_line.reset(remainder) - -def _spliterator(character, string): - parts = list(reversed(re.split('(%s)' % character, string))) - while parts: - part = parts.pop() - splitparts = (parts.pop() if parts else None) - nextpart = (parts.pop() if parts else None) - yield (part, splitparts, nextpart) - if nextpart is not None: - parts.append(nextpart) +class _Accumulator(list): - -class _Accumulator: def __init__(self, initial_size=0): self._initial_size = initial_size - self._current = [] + super().__init__() - def push(self, string): - self._current.append(string) + def push(self, fws, string): + self.append((fws, string)) + + def pop_from(self, i=0): + popped = self[i:] + self[i:] = [] + return popped def pop(self): - if not self._current: - return None - return self._current.pop() + if self.part_count()==0: + return ('', '') + return super().pop() def __len__(self): - return sum(((1 if string is None else len(string)) - for string in self._current), + return sum((len(fws)+len(part) for fws, part in self), self._initial_size) def __str__(self): - if self._current and self._current[-1] is None: - self._current.pop() - return EMPTYSTRING.join((' ' if string is None else string) - for string in self._current) + return EMPTYSTRING.join((EMPTYSTRING.join((fws, part)) + for fws, part in self)) - def reset(self, string=None): - self._current = [] + def reset(self, startval=None): + if startval is None: + startval = [] + self[:] = startval self._initial_size = 0 - if string is not None: - self.push(string) def is_onlyws(self): - return len(self) == 0 or str(self).isspace() + return self._initial_size==0 and (not self or str(self).isspace()) + + def part_count(self): + return super().__len__() diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -588,6 +588,9 @@ # Test long header wrapping class TestLongHeaders(TestEmailBase): + + maxDiff = None + def test_split_long_continuation(self): eq = self.ndiffAssertEqual msg = email.message_from_string("""\ @@ -796,14 +799,12 @@ eq = self.ndiffAssertEqual h = Header('; ' 'this_part_does_not_fit_within_maxlinelen_and_thus_should_' - 'be_on_a_line_all_by_itself;') + 'be_on_a_line_all_by_itself; ') eq(h.encode(), """\ ; - this_part_does_not_fit_within_maxlinelen_and_thus_should_be_on_a_line_all_by_itself;""") + this_part_does_not_fit_within_maxlinelen_and_thus_should_be_on_a_line_all_by_itself; """) def test_long_header_with_multiple_sequential_split_chars(self): - # Issue 11492 - eq = self.ndiffAssertEqual h = Header('This is a long line that has two whitespaces in a row. ' 'This used to cause truncation of the header when folded') @@ -811,6 +812,105 @@ This is a long line that has two whitespaces in a row. This used to cause truncation of the header when folded""") + def test_splitter_split_on_punctuation_only_if_fws(self): + eq = self.ndiffAssertEqual + h = Header('thisverylongheaderhas;semicolons;and,commas,but' + 'they;arenotlegal;fold,points') + eq(h.encode(), "thisverylongheaderhas;semicolons;and,commas,butthey;" + "arenotlegal;fold,points") + + def test_leading_splittable_in_the_middle_just_before_overlong_last_part(self): + eq = self.ndiffAssertEqual + h = Header('this is a test where we need to have more than one line ' + 'before; our final line that is just too big to fit;; ' + 'this_part_does_not_fit_within_maxlinelen_and_thus_should_' + 'be_on_a_line_all_by_itself;') + eq(h.encode(), """\ +this is a test where we need to have more than one line before; + our final line that is just too big to fit;; + this_part_does_not_fit_within_maxlinelen_and_thus_should_be_on_a_line_all_by_itself;""") + + def test_overlong_last_part_followed_by_split_point(self): + eq = self.ndiffAssertEqual + h = Header('this_part_does_not_fit_within_maxlinelen_and_thus_should_' + 'be_on_a_line_all_by_itself ') + eq(h.encode(), "this_part_does_not_fit_within_maxlinelen_and_thus_" + "should_be_on_a_line_all_by_itself ") + + def test_multiline_with_overlong_parts_separated_by_two_split_points(self): + eq = self.ndiffAssertEqual + h = Header('this_is_a__test_where_we_need_to_have_more_than_one_line_' + 'before_our_final_line_; ; ' + 'this_part_does_not_fit_within_maxlinelen_and_thus_should_' + 'be_on_a_line_all_by_itself; ') + eq(h.encode(), """\ +this_is_a__test_where_we_need_to_have_more_than_one_line_before_our_final_line_; + ; + this_part_does_not_fit_within_maxlinelen_and_thus_should_be_on_a_line_all_by_itself; """) + + def test_multiline_with_overlong_last_part_followed_by_split_point(self): + eq = self.ndiffAssertEqual + h = Header('this is a test where we need to have more than one line ' + 'before our final line; ; ' + 'this_part_does_not_fit_within_maxlinelen_and_thus_should_' + 'be_on_a_line_all_by_itself; ') + eq(h.encode(), """\ +this is a test where we need to have more than one line before our final line; + ; + this_part_does_not_fit_within_maxlinelen_and_thus_should_be_on_a_line_all_by_itself; """) + + def test_long_header_with_whitespace_runs(self): + eq = self.ndiffAssertEqual + msg = Message() + msg['From'] = 'test@dom.ain' + msg['References'] = SPACE.join([' '] * 10) + msg.set_payload('Test') + sfp = StringIO() + g = Generator(sfp) + g.flatten(msg) + eq(sfp.getvalue(), """\ +From: test@dom.ain +References: + + \x20\x20 + +Test""") + + def test_long_run_with_semi_header_splitter(self): + eq = self.ndiffAssertEqual + msg = Message() + msg['From'] = 'test@dom.ain' + msg['References'] = SPACE.join([''] * 10) + '; abc' + msg.set_payload('Test') + sfp = StringIO() + g = Generator(sfp) + g.flatten(msg) + eq(sfp.getvalue(), """\ +From: test@dom.ain +References: + + ; abc + +Test""") + + def test_splitter_split_on_punctuation_only_if_fws(self): + eq = self.ndiffAssertEqual + msg = Message() + msg['From'] = 'test@dom.ain' + msg['References'] = ('thisverylongheaderhas;semicolons;and,commas,but' + 'they;arenotlegal;fold,points') + msg.set_payload('Test') + sfp = StringIO() + g = Generator(sfp) + g.flatten(msg) + # XXX the space after the header should not be there. + eq(sfp.getvalue(), """\ +From: test@dom.ain +References:\x20 + thisverylongheaderhas;semicolons;and,commas,butthey;arenotlegal;fold,points + +Test""") + def test_no_split_long_header(self): eq = self.ndiffAssertEqual hstr = 'References: ' + 'x' * 80 @@ -901,7 +1001,7 @@ def test_long_to_header(self): eq = self.ndiffAssertEqual to = ('"Someone Test #A" ,' - ',' + ', ' '"Someone Test #B" , ' '"Someone Test #C" , ' '"Someone Test #D" ') @@ -956,9 +1056,11 @@ msg['Received-2'] = h # This should be splitting on spaces not semicolons. self.ndiffAssertEqual(msg.as_string(maxheaderlen=78), """\ -Received-1: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by hrothgar.la.mastaler.com (tmda-ofmipd) with ESMTP; +Received-1: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by + hrothgar.la.mastaler.com (tmda-ofmipd) with ESMTP; Wed, 05 Mar 2003 18:10:18 -0700 -Received-2: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by hrothgar.la.mastaler.com (tmda-ofmipd) with ESMTP; +Received-2: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by + hrothgar.la.mastaler.com (tmda-ofmipd) with ESMTP; Wed, 05 Mar 2003 18:10:18 -0700 """) @@ -971,12 +1073,14 @@ msg['Received-1'] = Header(h, header_name='Received-1', continuation_ws='\t') msg['Received-2'] = h - # XXX This should be splitting on spaces not commas. + # XXX The space after the ':' should not be there. self.ndiffAssertEqual(msg.as_string(maxheaderlen=78), """\ -Received-1: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> (David Bremner's message of \"Thu, - 6 Mar 2003 13:58:21 +0100\") -Received-2: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> (David Bremner's message of \"Thu, - 6 Mar 2003 13:58:21 +0100\") +Received-1:\x20 + <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> (David + Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\") +Received-2:\x20 + <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> (David + Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\") """) @@ -988,8 +1092,9 @@ locQDQ4zJykFBAXJfWDjAAACYUlEQVR4nF2TQY/jIAyFc6lydlG5x8Nyp1Y69wj1PN2I5gzp""" msg['Face-1'] = t msg['Face-2'] = Header(t, header_name='Face-2') + msg['Face-3'] = ' ' + t # XXX This splitting is all wrong. It the first value line should be - # snug against the field name. + # snug against the field name or the space after the header not there. eq(msg.as_string(maxheaderlen=78), """\ Face-1:\x20 iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAAGFBMVEUAAAAkHiJeRUIcGBi9 @@ -997,6 +1102,9 @@ Face-2:\x20 iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAAGFBMVEUAAAAkHiJeRUIcGBi9 locQDQ4zJykFBAXJfWDjAAACYUlEQVR4nF2TQY/jIAyFc6lydlG5x8Nyp1Y69wj1PN2I5gzp +Face-3:\x20 + iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAAGFBMVEUAAAAkHiJeRUIcGBi9 + locQDQ4zJykFBAXJfWDjAAACYUlEQVR4nF2TQY/jIAyFc6lydlG5x8Nyp1Y69wj1PN2I5gzp """) @@ -1008,8 +1116,8 @@ 'Wed, 16 Oct 2002 07:41:11 -0700') msg = email.message_from_string(m) eq(msg.as_string(maxheaderlen=78), '''\ -Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with Microsoft SMTPSVC(5.0.2195.4905); - Wed, 16 Oct 2002 07:41:11 -0700 +Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with + Microsoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700 ''') @@ -1023,9 +1131,11 @@ msg['List'] = h msg['List'] = Header(h, header_name='List') eq(msg.as_string(maxheaderlen=78), """\ -List: List-Unsubscribe: , +List: List-Unsubscribe: + , -List: List-Unsubscribe: , +List: List-Unsubscribe: + , """) @@ -4077,6 +4187,11 @@ msg = email.message_from_string("EmptyHeader:") self.assertEqual(str(msg), "EmptyHeader: \n\n") + def test_encode_preserves_leading_ws_on_value(self): + msg = Message() + msg['SomeHeader'] = ' value with leading ws' + self.assertEqual(str(msg), "SomeHeader: value with leading ws\n\n") + # Test RFC 2231 header parameters (en/de)coding