diff --git a/Doc/library/email.errors.rst b/Doc/library/email.errors.rst --- a/Doc/library/email.errors.rst +++ b/Doc/library/email.errors.rst @@ -79,9 +79,18 @@ * :class:`MisplacedEnvelopeHeaderDefect` - A "Unix From" header was found in the middle of a header block. +* :class:`MissingHeaderBodySeparatorDefect` - A line was found while parsing + headers that had no leading white space but contained no ':'. Parsing + continues assuming that the line represents the first line of the body. + + .. versionadded: 3.3 + * :class:`MalformedHeaderDefect` -- A header was found that was missing a colon, or was otherwise malformed. + .. deprecated:: 3.3 + This defect has not been used for several Python versions. + * :class:`MultipartInvariantViolationDefect` -- A message claimed to be a :mimetype:`multipart`, but no subparts were found. Note that when a message has this defect, its :meth:`is_multipart` method may return false even though its diff --git a/Lib/email/errors.py b/Lib/email/errors.py --- a/Lib/email/errors.py +++ b/Lib/email/errors.py @@ -48,8 +48,10 @@ class MisplacedEnvelopeHeaderDefect(MessageDefect): """A 'Unix-from' header was found in the middle of a header block.""" -class MalformedHeaderDefect(MessageDefect): - """Found a header that was missing a colon, or was otherwise malformed.""" +class MissingHeaderBodySeparatorDefect(MessageDefect): + """Found line with no leading whitespace and no colon before blank line.""" +# XXX: backward compatibility, just in case (it was never emitted). +MalformedHeaderDefect = MissingHeaderBodySeparatorDefect class MultipartInvariantViolationDefect(MessageDefect): """A message claimed to be a multipart but no subparts were found.""" diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -219,6 +219,8 @@ # (i.e. newline), just throw it away. Otherwise the line is # part of the body so push it back. if not NLCRE.match(line): + defect = errors.MissingHeaderBodySeparatorDefect() + self.policy.handle_defect(self._cur, defect) self._input.unreadline(line) break headers.append(line) @@ -488,12 +490,10 @@ self._cur.defects.append(defect) continue # Split the line on the colon separating field name from value. + # There will always be a colon, because if there wasn't the part of + # the parser that calls us would have started parsing the body. i = line.find(':') - if i < 0: - defect = errors.MalformedHeaderDefect(line) - # XXX: fixme (defect not going through policy) - self._cur.defects.append(defect) - continue + assert i>0, "_parse_headers fed line with no : and no leading WS" lastheader = line[:i] lastvalue = [line] # Done with all the lines, so handle the last header. diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -1960,15 +1960,27 @@ # test_parser.TestMessageDefectDetectionBase def test_first_line_is_continuation_header(self): eq = self.assertEqual - m = ' Line 1\nLine 2\nLine 3' + m = ' Line 1\nSubject: test\n\nbody' msg = email.message_from_string(m) - eq(msg.keys(), []) - eq(msg.get_payload(), 'Line 2\nLine 3') + eq(msg.keys(), ['Subject']) + eq(msg.get_payload(), 'body') eq(len(msg.defects), 1) - self.assertTrue(isinstance(msg.defects[0], - errors.FirstHeaderLineIsContinuationDefect)) + self.assertDefectsEqual(msg.defects, + [errors.FirstHeaderLineIsContinuationDefect]) eq(msg.defects[0].line, ' Line 1\n') + # test_parser.TestMessageDefectDetectionBase + def test_missing_header_body_separator(self): + # Our heuristic if we see a line that doesn't look like a header (no + # leading whitespace but no ':') is to assume that the blank line that + # separates the header from the body is missing, and to stop parsing + # headers and start parsing the body. + msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') + self.assertEqual(msg.keys(), ['Subject']) + self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') + self.assertDefectsEqual(msg.defects, + [errors.MissingHeaderBodySeparatorDefect]) + # Test RFC 2047 header encoding and decoding class TestRFC2047(TestEmailBase): diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py --- a/Lib/test/test_email/test_parser.py +++ b/Lib/test/test_email/test_parser.py @@ -237,17 +237,33 @@ policy=self.policy.clone(raise_on_defect=True)) def test_first_line_is_continuation_header(self): - msg = self._str_msg(' Line 1\nLine 2\nLine 3') - self.assertEqual(msg.keys(), []) - self.assertEqual(msg.get_payload(), 'Line 2\nLine 3') + msg = self._str_msg(' Line 1\nSubject: test\n\nbody') + self.assertEqual(msg.keys(), ['Subject']) + self.assertEqual(msg.get_payload(), 'body') self.assertEqual(len(self.get_defects(msg)), 1) - self.assertTrue(isinstance(self.get_defects(msg)[0], - errors.FirstHeaderLineIsContinuationDefect)) + self.assertDefectsEqual(self.get_defects(msg), + [errors.FirstHeaderLineIsContinuationDefect]) self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') def test_first_line_is_continuation_header_raise_on_defect(self): with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): - self._str_msg(' Line 1\nLine 2\nLine 3', + self._str_msg(' Line 1\nSubject: test\n\nbody\n', + policy=self.policy.clone(raise_on_defect=True)) + + def test_missing_header_body_separator(self): + # Our heuristic if we see a line that doesn't look like a header (no + # leading whitespace but no ':') is to assume that the blank line that + # separates the header from the body is missing, and to stop parsing + # headers and start parsing the body. + msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') + self.assertEqual(msg.keys(), ['Subject']) + self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') + self.assertDefectsEqual(self.get_defects(msg), + [errors.MissingHeaderBodySeparatorDefect]) + + def test_missing_header_body_separator_raise_on_defect(self): + with self.assertRaises(errors.MissingHeaderBodySeparatorDefect): + self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n', policy=self.policy.clone(raise_on_defect=True))