diff --git a/Doc/library/email.headerregistry.rst b/Doc/library/email.headerregistry.rst --- a/Doc/library/email.headerregistry.rst +++ b/Doc/library/email.headerregistry.rst @@ -234,11 +234,80 @@ result in a :exc:`ValueError`. -Each of the above classes also has a ``Unique`` variant (for example, +Many of the above classes also have a ``Unique`` variant (for example, ``UniqueUnstructuredHeader``). The only difference is that in the ``Unique`` variant, :attr:`~.BaseHeader.max_count` is set to 1. +.. class:: MIMEVersionHeader + + There is really only one valid value for the :mailheader:`MIME-Version` + header, and that is ``1.0``. For future proofing, this header class + supports other valid version numbers. If a version number has a valid value + per :rfc:`2045`, then the header object will have non-``None`` values for + the following attributes: + + .. attribute:: version + + The version number as a string, with any whitespace and/or comments + removed. + + .. attribute:: major + + The major version number as an integer + + .. attribute:: minor + + The minor version number as an integer + + +.. class:: ParameterizedMIMEHeader + + MOME headers all start with the prefix 'Content-'. Each specific header has + a certain value, described under the class for that header. Some can + also take a list of supplemental parameters, which have a common format. + This class serves as a base for all the MIME headers that take parameters. + + .. attrbibute:: params + + A dictionary mapping parameter names to parameter values. + + +.. class:: ContentTypeHeader + + A :class:`ParameterizedMIMEHheader` class that handles the + :mailheader:`Content-Type` header. + + .. attribute:: content_type + + The content type string, in the form ``maintype/subtype``. + + .. attribute:: maintype + + .. attribute:: subtype + + +.. class:: ContentDispositionHeader + + A :class:`ParameterizedMIMEHheader` class that handles the + :mailheader:`Content-Disposition` header. + + .. attribute:: content-disposition + + ``inline`` and ``attachment`` are the only valid values in common use. + + +.. class:: ContentTransferEncoding + + Handles the :mailheader:`Content-Transfer-Encoding` header. + + .. attribute:: cte + + Valid values are ``7bit``, ``8bit``, ``base64``, and + ``quoted-printable``. See :rfc:`2045` for more information. + + + .. class:: HeaderRegistry(base_class=BaseHeader, \ default_class=UnstructuredHeader, \ use_default_map=True) diff --git a/Lib/email/_header_value_parser.py b/Lib/email/_header_value_parser.py --- a/Lib/email/_header_value_parser.py +++ b/Lib/email/_header_value_parser.py @@ -68,6 +68,8 @@ """ import re +import urllib # For urllib.parse.unquote +from collections import namedtuple, OrderedDict from email import _encoded_words as _ew from email import errors from email import utils @@ -83,6 +85,11 @@ DOT_ATOM_ENDS = ATOM_ENDS - set('.') # '.', '"', and '(' do not end phrases in order to support obs-phrase PHRASE_ENDS = SPECIALS - set('."(') +TSPECIALS = (SPECIALS | set('/?=')) - set('.') +TOKEN_ENDS = TSPECIALS | WSP +ASPECIALS = TSPECIALS | set("*'%") +ATTRIBUTE_ENDS = ASPECIALS | WSP +EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%') def quote_string(value): return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"' @@ -356,8 +363,12 @@ self.__class__.__name__, self.token_type) for token in self: - for line in token._pp(indent+' '): - yield line + if not hasattr(token, '_pp'): + yield (indent + ' !! invalid element in token ' + 'list: {!r}'.format(token)) + else: + for line in token._pp(indent+' '): + yield line if self.defects: extra = ' Defects: {}'.format(self.defects) else: @@ -567,6 +578,11 @@ token_type = 'atom' +class Token(TokenList): + + token_type = 'token' + + class EncodedWord(TokenList): token_type = 'encoded-word' @@ -602,13 +618,19 @@ res.append(x.value) return ''.join(res) + @property + def stripped_value(self): + for token in self: + if token.token_type == 'bare-quoted-string': + return token.value + class BareQuotedString(QuotedString): token_type = 'bare-quoted-string' def __str__(self): - return quote_string(''.join(self)) + return quote_string(''.join(str(x) for x in self)) @property def value(self): @@ -987,6 +1009,180 @@ return x.value +class MIMEVersion(TokenList): + + token_type = 'mime-version' + major = None + minor = None + + +class Parameter(TokenList): + + token_type = 'parameter' + sectioned = False + extended = False + charset = 'us-ascii' + + @property + def section_number(self): + # Because the first token, the attribute (name) eats CFWS, the second + # token is always the section if there is one. + return self[1].number if self.sectioned else 0 + + @property + def param_value(self): + # This is part of the "handle quoted extended parameters" hack. + for token in self: + if token.token_type == 'value': + return token.stripped_value + if token.token_type == 'quoted-string': + for token in token: + if token.token_type == 'bare-quoted-string': + for token in token: + if token.token_type == 'value': + return token.stripped_value + return '' + + +class InvalidParameter(Parameter): + + token_type = 'invalid-parameter' + + +class Attribute(TokenList): + + token_type = 'attribute' + + @property + def stripped_value(self): + for token in self: + if token.token_type.endswith('attrtext'): + return token.value + +class Section(TokenList): + + token_type = 'section' + number = None + + +class Value(TokenList): + + token_type = 'value' + + @property + def stripped_value(self): + token = self[0] + if token.token_type == 'cfws': + token = self[1] + if token.token_type.endswith( + ('quoted-string', 'attribute', 'extended-attribute')): + return token.stripped_value + return self.value + + +class MimeParameters(TokenList): + + token_type = 'mime-parameters' + + @property + def params(self): + # The RFC specifically states that the ordering of parameters is not + # guaranteed and may be reordered by the transport layer. So we have + # to assume the RFC 2231 pieces can come in any order. However, we + # output them in the order that we first see a given name, which gives + # us a stable __str__. + params = OrderedDict() + for token in self: + if not token.token_type.endswith('parameter'): + continue + if token[0].token_type != 'attribute': + continue + name = token[0].value.strip() + if name not in params: + params[name] = [] + params[name].append((token.section_number, token)) + for name, parts in params.items(): + parts = sorted(parts) + # XXX: there might be more recovery we could do here if, for + # example, this is really a case of a duplicate attribute name. + value_parts = [] + charset = parts[0][1].charset + for i, (section_number, param) in enumerate(parts): + if section_number != i: + param.defects.append(errors.InvalidHeaderDefect( + "inconsistent multipart parameter numbering")) + value = param.param_value + if param.extended: + try: + value = urllib.parse.unquote_to_bytes(value) + except UnicodeEncodeError: + # source had surrogate escaped bytes. What we do now + # is a bit of an open question. I'm not sure this is + # the best choice, but it is what the old algorithm did + value = urllib.parse.unquote(value, encoding='latin-1') + else: + try: + value = value.decode(charset, 'surrogateescape') + except LookupError: + # XXX: there should really be a custom defect for + # unknown character set to make it easy to find, + # because otherwise unknown charset is a silent + # failure. + value = value.decode('us-ascii', 'surrogateescape') + if utils._has_surrogates(value): + param.defects.append(errors.UndecodableBytesDefect()) + value_parts.append(value) + value = ''.join(value_parts) + yield name, value + + def __str__(self): + params = [] + for name, value in self.params: + if value: + params.append('{}={}'.format(name, quote_string(value))) + else: + params.append(name) + params = '; '.join(params) + return ' ' + params if params else '' + + +class ParameterizedHeaderValue(TokenList): + + @property + def params(self): + for token in reversed(self): + if token.token_type == 'mime-parameters': + return token.params + return {} + + @property + def parts(self): + if self and self[-1].token_type == 'mime-parameters': + # We don't want to start a new line if all of the params don't fit + # after the value, so unwrap the parameter list. + return TokenList(self[:-1] + self[-1]) + return TokenList(self).parts + + +class ContentType(ParameterizedHeaderValue): + + token_type = 'content-type' + maintype = 'text' + subtype = 'plain' + + +class ContentDisposition(ParameterizedHeaderValue): + + token_type = 'content-disposition' + content_disposition = None + + +class ContentTransferEncoding(TokenList): + + token_type = 'content-transfer-encoding' + cte = '7bit' + + class HeaderLabel(TokenList): token_type = 'header-label' @@ -1145,6 +1341,13 @@ _non_atom_end_matcher = re.compile(r"[^{}]+".format( ''.join(ATOM_ENDS).replace('\\','\\\\').replace(']','\]'))).match _non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall +_non_token_end_matcher = re.compile(r"[^{}]+".format( + ''.join(TOKEN_ENDS).replace('\\','\\\\').replace(']','\]'))).match +_non_attribute_end_matcher = re.compile(r"[^{}]+".format( + ''.join(ATTRIBUTE_ENDS).replace('\\','\\\\').replace(']','\]'))).match +_non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format( + ''.join(EXTENDED_ATTRIBUTE_ENDS).replace( + '\\','\\\\').replace(']','\]'))).match def _validate_xtext(xtext): """If input token contains ASCII non-printables, register a defect.""" @@ -2153,3 +2356,598 @@ address_list.append(ValueTerminal(',', 'list-separator')) value = value[1:] return address_list, value + +# +# XXX: As I begin to add additional header parsers, I'm realizing we probably +# have two level of parser routines: the get_XXX methods that get a token in +# the grammar, and parse_XXX methods that parse an entire field value. So +# get_address_list above should really be a parse_ method, as probably should +# be get_unstructured. +# + +def parse_mime_version(value): + """ mime-version = [CFWS] 1*digit [CFWS] "." [CFWS] 1*digit [CFWS] + + """ + # The [CFWS] is implicit in the RFC 2045 BNF. + # XXX: This routine is a bit verbose, should factor out a get_int method. + mime_version = MIMEVersion() + if not value: + mime_version.defects.append(errors.HeaderMissingRequiredValue( + "Missing MIME version number (eg: 1.0)")) + return mime_version + if value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mime_version.append(token) + if not value: + mime_version.defects.append(errors.HeaderMissingRequiredValue( + "Expected MIME version number but found only CFWS")) + digits = '' + while value and value[0] != '.' and value[0] not in CFWS_LEADER: + digits += value[0] + value = value[1:] + if not digits.isdigit(): + mime_version.defects.append(errors.InvalidHeaderDefect( + "Expected MIME major version number but found {!r}".format(digits))) + mime_version.append(ValueTerminal(digits, 'xtext')) + else: + mime_version.major = int(digits) + mime_version.append(ValueTerminal(digits, 'digits')) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mime_version.append(token) + if not value or value[0] != '.': + if mime_version.major is not None: + mime_version.defects.append(errors.InvalidHeaderDefect( + "Incomplete MIME version; found only major number")) + if value: + mime_version.append(ValueTerminal(value, 'xtext')) + return mime_version + mime_version.append(ValueTerminal('.', 'version-separator')) + value = value[1:] + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mime_version.append(token) + if not value: + if mime_version.major is not None: + mime_version.defects.append(errors.InvalidHeaderDefect( + "Incomplete MIME version; found only major number")) + return mime_version + digits = '' + while value and value[0] not in CFWS_LEADER: + digits += value[0] + value = value[1:] + if not digits.isdigit(): + mime_version.defects.append(errors.InvalidHeaderDefect( + "Expected MIME minor version number but found {!r}".format(digits))) + mime_version.append(ValueTerminal(digits, 'xtext')) + else: + mime_version.minor = int(digits) + mime_version.append(ValueTerminal(digits, 'digits')) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mime_version.append(token) + if value: + mime_version.defects.append(errors.InvalidHeaderDefect( + "Excess non-CFWS text after MIME version")) + mime_version.append(ValueTerminal(value, 'xtext')) + return mime_version + +def get_invalid_parameter(value): + """ Read everything up to the next ';'. + + This is outside the formal grammar. The InvalidParameter TokenList that is + returned acts like a Parameter, but the data attributes are None. + + """ + invalid_parameter = InvalidParameter() + while value and value[0] != ';': + if value[0] in PHRASE_ENDS: + invalid_parameter.append(ValueTerminal(value[0], + 'misplaced-special')) + value = value[1:] + else: + token, value = get_phrase(value) + invalid_parameter.append(token) + return invalid_parameter, value + +def get_ttext(value): + """ttext = + + We allow any non-TOKEN_ENDS in ttext, but add defects to the token's + defects list if we find non-ttext characters. We also register defects for + *any* non-printables even though the RFC doesn't exclude all of them, + because we follow the spirit of RFC 5322. + + """ + m = _non_token_end_matcher(value) + if not m: + raise errors.HeaderParseError( + "expected ttext but found '{}'".format(value)) + ttext = m.group() + value = value[len(ttext):] + ttext = ValueTerminal(ttext, 'ttext') + _validate_xtext(ttext) + return ttext, value + +def get_token(value): + """token = [CFWS] 1*ttext [CFWS] + + The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or + tspecials. We also exclude tabs even though the RFC doesn't. + + The RFC implies the CFWS but is not explicit about it in the BNF. + + """ + mtoken = Token() + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mtoken.append(token) + if value and value[0] in TOKEN_ENDS: + raise errors.HeaderParseError( + "expected token but found '{}'".format(value)) + token, value = get_ttext(value) + mtoken.append(token) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + mtoken.append(token) + return mtoken, value + +def get_attrtext(value): + """attrtext = 1*(any non-ATTRIBUTE_ENDS character) + + We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the + token's defects list if we find non-attrtext characters. We also register + defects for *any* non-printables even though the RFC doesn't exclude all of + them, because we follow the spirit of RFC 5322. + + """ + m = _non_attribute_end_matcher(value) + if not m: + raise errors.HeaderParseError( + "expected attrtext but found {!r}".format(value)) + attrtext = m.group() + value = value[len(attrtext):] + attrtext = ValueTerminal(attrtext, 'attrtext') + _validate_xtext(attrtext) + return attrtext, value + +def get_attribute(value): + """ [CFWS] 1*attrtext [CFWS] + + This version of the BNF makes the CFWS explicit, and as usual we use a + value terminal for the actual run of characters. The RFC equivalent of + attrtext is the token characters, with the subtraction of '*', "'", and '%'. + We include tab in the excluded set just as we do for token. + + """ + attribute = Attribute() + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + attribute.append(token) + if value and value[0] in ATTRIBUTE_ENDS: + raise errors.HeaderParseError( + "expected token but found '{}'".format(value)) + token, value = get_attrtext(value) + attribute.append(token) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + attribute.append(token) + return attribute, value + +def get_extended_attrtext(value): + """attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%') + + This is a special parsing routine so that we get a value that + includes % escapes as a single string (which we decode as a single + string later). + + """ + m = _non_extended_attribute_end_matcher(value) + if not m: + raise errors.HeaderParseError( + "expected extended attrtext but found {!r}".format(value)) + attrtext = m.group() + value = value[len(attrtext):] + attrtext = ValueTerminal(attrtext, 'extended-attrtext') + _validate_xtext(attrtext) + return attrtext, value + +def get_extended_attribute(value): + """ [CFWS] 1*extended_attrtext [CFWS] + + This is like the non-extended version except we allow % characters, so that + we can pick up an encoded value as a single string. + + """ + # XXX: should we have an ExtendedAttribute TokenList? + attribute = Attribute() + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + attribute.append(token) + if value and value[0] in EXTENDED_ATTRIBUTE_ENDS: + raise errors.HeaderParseError( + "expected token but found '{}'".format(value)) + token, value = get_extended_attrtext(value) + attribute.append(token) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + attribute.append(token) + return attribute, value + +def get_section(value): + """ '*' digits + + The formal BNF is more complicated because leading 0s are not allowed. We + check for that and add a defect. We also assume no CFWS is allowed between + the '*' and the digits, though the RFC is not crystal clear on that. + The caller should already have dealt with leading CFWS. + + """ + section = Section() + if not value or value[0] != '*': + raise errors.HeaderParseError("Expected section but found {}".format( + value)) + section.append(ValueTerminal('*', 'section-marker')) + value = value[1:] + if not value or not value[0].isdigit(): + raise errors.HeaderParseError("Expected section number but " + "found {}".format(value)) + digits = '' + while value and value[0].isdigit(): + digits += value[0] + value = value[1:] + if digits[0] == '0' and digits != '0': + section.defects.append(errors.InvalidHeaderError("section number" + "has an invalid leading 0")) + section.number = int(digits) + section.append(ValueTerminal(digits, 'digits')) + return section, value + + +def get_value(value): + """ quoted-string / attribute + + """ + v = Value() + if not value: + raise errors.HeaderParseError("Expected value but found end of string") + leader = None + if value[0] in CFWS_LEADER: + leader, value = get_cfws(value) + if not value: + raise errors.HeaderParseError("Expected value but found " + "only {}".format(leader)) + if value[0] == '"': + token, value = get_quoted_string(value) + else: + token, value = get_extended_attribute(value) + if leader is not None: + token[:0] = [leader] + v.append(token) + return v, value + +def get_parameter(value): + """ attribute [section] ["*"] [CFWS] "=" value + + The CFWS is implied by the RFC but not made explicit in the BNF. This + simplified form of the BNF from the RFC is made to conform with the RFC BNF + through some extra checks. We do it this way because it makes both error + recovery and working with the resulting parse tree easier. + """ + # It is possible CFWS would also be implicitly allowed between the section + # and the 'extended-attribute' marker (the '*') , but we've never seen that + # in the wild and we will therefore ignore the possibility. + param = Parameter() + token, value = get_attribute(value) + param.append(token) + if not value or value[0] == ';': + param.defects.append(errors.InvalidHeaderDefect("Parameter contains " + "name ({}) but no value".format(token))) + return param, value + if value[0] == '*': + try: + token, value = get_section(value) + param.sectioned = True + param.append(token) + except errors.HeaderParseError: + pass + if not value: + raise errors.HeaderParseError("Incomplete parameter") + if value[0] == '*': + param.append(ValueTerminal('*', 'extended-parameter-marker')) + value = value[1:] + param.extended = True + if value[0] != '=': + raise errors.HeaderParseError("Parameter not followed by '='") + param.append(ValueTerminal('=', 'parameter-separator')) + value = value[1:] + leader = None + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + param.append(token) + remainder = None + appendto = param + if param.extended and value and value[0] == '"': + # Now for some serious hackery to handle the common invalid case of + # double quotes around an extended value. We also accept (with defect) + # a value marked as encoded that isn't really. + qstring, remainder = get_quoted_string(value) + inner_value = qstring.stripped_value + semi_valid = False + if param.section_number == 0: + if inner_value and inner_value[0] == "'": + semi_valid = True + else: + token, rest = get_attrtext(inner_value) + if rest and rest[0] == "'": + semi_valid = True + else: + try: + token, rest = get_extended_attrtext(inner_value) + except: + pass + else: + if not rest: + semi_valid = True + if semi_valid: + param.defects.append(errors.InvalidHeaderDefect( + "Quoted string value for extended parameter is invalid")) + param.append(qstring) + for t in qstring: + if t.token_type == 'bare-quoted-string': + t[:] = [] + appendto = t + break + value = inner_value + else: + remainder = None + param.defects.append(errors.InvalidHeaderDefect( + "Parameter marked as extended but appears to have a " + "quoted string value that is non-encoded")) + if value and value[0] == "'": + token = None + else: + token, value = get_value(value) + if not param.extended or param.section_number > 0: + if not value or value[0] != "'": + appendto.append(token) + if remainder is not None: + assert not value, value + value = remainder + return param, value + param.defects.append(errors.InvalidHeaderDefect( + "Apparent initial-extended-value but attribute " + "was not marked as extended or was not initial section")) + if not value: + # Assume the charset/lang is missing and the token is the value. + param.defects.append(errors.InvalidHeaderDefect( + "Missing required charset/lang delimiters")) + appendto.append(token) + if remainder is None: + return param, value + else: + if token is not None: + for t in token: + if t.token_type == 'extended-attrtext': + break + t.token_type == 'attrtext' + appendto.append(t) + param.charset = t.value + if value[0] != "'": + raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " + "delimiter, but found {!r}".format(value)) + appendto.append(ValueTerminal("'", 'RFC2231 delimiter')) + value = value[1:] + if value and value[0] != "'": + token, value = get_attrtext(value) + appendto.append(token) + param.lang = token.value + if not value or value[0] != "'": + raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " + "delimiter, but found {}".format(value)) + appendto.append(ValueTerminal("'", 'RFC2231 delimiter')) + value = value[1:] + if remainder is not None: + # Treat the rest of value as bare quoted string content. + v = Value() + while value: + if value[0] in WSP: + token, value = get_fws(value) + else: + token, value = get_qcontent(value) + v.append(token) + token = v + else: + token, value = get_value(value) + appendto.append(token) + if remainder is not None: + assert not value, value + value = remainder + return param, value + +def parse_mime_parameters(value): + """ parameter *( ";" parameter ) + + That BNF is meant to indicate this routine should only be called after + finding and handling the leading ';'. There is no corresponding rule in + the formal RFC grammar, but it is more convenient for us for the set of + parameters to be treated as its own TokenList. + + This is 'parse' routine because it consumes the reminaing value, but it + would never be called to parse a full header. Instead it is called to + parse everything after the non-parameter value of a specific MIME header. + + """ + mime_parameters = MimeParameters() + while value: + try: + token, value = get_parameter(value) + mime_parameters.append(token) + except errors.HeaderParseError as err: + leader = None + if value[0] in CFWS_LEADER: + leader, value = get_cfws(value) + if not value: + mime_parameters.append(leader) + return mime_parameters + if value[0] == ';': + if leader is not None: + mime_parameters.append(leader) + mime_parameters.defects.append(errors.InvalidHeaderDefect( + "parameter entry with no content")) + else: + token, value = get_invalid_parameter(value) + if leader: + token[:0] = [leader] + mime_parameters.append(token) + mime_parameters.defects.append(errors.InvalidHeaderDefect( + "invalid parameter {!r}".format(token))) + if value and value[0] != ';': + # Junk after the otherwise valid parameter. Mark it as + # invalid, but it will have a value. + param = mime_parameters[-1] + param.token_type = 'invalid-parameter' + token, value = get_invalid_parameter(value) + param.extend(token) + mime_parameters.defects.append(errors.InvalidHeaderDefect( + "parameter with invalid trailing text {!r}".format(token))) + if value: + # Must be a ';' at this point. + mime_parameters.append(ValueTerminal(';', 'parameter-separator')) + value = value[1:] + return mime_parameters + +def _find_mime_parameters(tokenlist, value): + """Do our best to find the parameters in an invalid MIME header + + """ + while value and value[0] != ';': + if value[0] in PHRASE_ENDS: + tokenlist.append(ValueTerminal(value[0], 'misplaced-special')) + value = value[1:] + else: + token, value = get_phrase(value) + tokenlist.append(token) + if not value: + return + tokenlist.append(ValueTerminal(';', 'parameter-separator')) + tokenlist.append(parse_mime_parameters(value[1:])) + +def parse_content_type_header(value): + """ maintype "/" subtype *( ";" parameter ) + + The maintype and substype are tokens. Theoretically they could + be checked against the official IANA list + x-token, but we + don't do that. + """ + ctype = ContentType() + recover = False + if not value: + ctype.defects.append(errors.HeaderMissingRequiredValue( + "Missing content type specification")) + return ctype + try: + token, value = get_token(value) + except errors.HeaderParseError: + ctype.defects.append(errors.InvalidHeaderDefect( + "Expected content maintype but found {!r}".format(value))) + _find_mime_parameters(ctype, value) + return ctype + ctype.append(token) + # XXX: If we really want to follow the formal grammer we should make + # mantype and subtype specialized TokenLists here. Probably not worth it. + if not value or value[0] != '/': + ctype.defects.append(errors.InvalidHeaderDefect( + "Invalid content type")) + if value: + _find_mime_parameters(ctype, value) + return ctype + ctype.maintype = token.value.strip().lower() + ctype.append(ValueTerminal('/', 'content-type-separator')) + value = value[1:] + try: + token, value = get_token(value) + except errors.HeaderParseError: + ctype.defects.append(errors.InvalidHeaderDefect( + "Expected content subtype but found {!r}".format(value))) + _find_mime_parameters(ctype, value) + return ctype + ctype.append(token) + ctype.subtype = token.value.strip().lower() + if not value: + return ctype + if value[0] != ';': + ctype.defects.append(errors.InvalidHeaderDefect( + "Only parameters are valid after content type, but " + "found {!r}".format(value))) + # The RFC requires that a syntactically invalid content-type be treated + # as text/plain. Perhaps we should postel this, but we should probably + # only do that if we were checking the subtype value against IANA. + del ctype.maintype, ctype.subtype + _find_mime_parameters(ctype, value) + return ctype + ctype.append(ValueTerminal(';', 'parameter-separator')) + ctype.append(parse_mime_parameters(value[1:])) + return ctype + +def parse_content_disposition_header(value): + """ disposition-type *( ";" parameter ) + + """ + disp_header = ContentDisposition() + if not value: + disp_header.defects.append(errors.HeaderMissingRequiredValue( + "Missing content disposition")) + return disp_header + try: + token, value = get_token(value) + except errors.HeaderParseError: + ctype.defects.append(errors.InvalidHeaderDefect( + "Expected content disposition but found {!r}".format(value))) + _find_mime_parameters(disp_header, value) + return disp_header + disp_header.append(token) + disp_header.content_disposition = token.value.strip().lower() + if not value: + return disp_header + if value[0] != ';': + disp_header.defects.append(errors.InvalidHeaderDefect( + "Only parameters are valid after content disposition, but " + "found {!r}".format(value))) + _find_mime_parameters(disp_header, value) + return disp_header + disp_header.append(ValueTerminal(';', 'parameter-separator')) + disp_header.append(parse_mime_parameters(value[1:])) + return disp_header + +def parse_content_transfer_encoding_header(value): + """ mechanism + + """ + # We should probably validate the values, since the list is fixed. + cte_header = ContentTransferEncoding() + if not value: + cte_header.defects.append(errors.HeaderMissingRequiredValue( + "Missing content transfer encoding")) + return cte_header + try: + token, value = get_token(value) + except errors.HeaderParseError: + ctype.defects.append(errors.InvalidHeaderDefect( + "Expected content trnasfer encoding but found {!r}".format(value))) + else: + cte_header.append(token) + cte_header.cte = token.value.strip().lower() + if not value: + return cte_header + while value: + cte_header.defects.append(errors.InvalidHeaderDefect( + "Extra text after content transfer encoding")) + if value[0] in PHRASE_ENDS: + cte_header.append(ValueTerminal(value[0], 'misplaced-special')) + value = value[1:] + else: + token, value = get_phrase(value) + cte_header.append(token) + return cte_header diff --git a/Lib/email/headerregistry.py b/Lib/email/headerregistry.py --- a/Lib/email/headerregistry.py +++ b/Lib/email/headerregistry.py @@ -391,24 +391,151 @@ max_count = 1 +class MIMEVersionHeader: + + max_count = 1 + + value_parser = staticmethod(parser.parse_mime_version) + + @classmethod + def parse(cls, value, kwds): + kwds['parse_tree'] = parse_tree = cls.value_parser(value) + kwds['decoded'] = str(parse_tree) + kwds['defects'].extend(parse_tree.all_defects) + kwds['major'] = None if parse_tree.minor is None else parse_tree.major + kwds['minor'] = parse_tree.minor + if parse_tree.minor is not None: + kwds['version'] = '{}.{}'.format(kwds['major'], kwds['minor']) + else: + kwds['version'] = None + + def init(self, *args, **kw): + self._version = kw.pop('version') + self._major = kw.pop('major') + self._minor = kw.pop('minor') + super().init(*args, **kw) + + @property + def major(self): + return self._major + + @property + def minor(self): + return self._minor + + @property + def version(self): + return self._version + + +class ParameterizedMIMEHeader: + + # Mixin that handles the params dict. Must be subclassed and + # a property value_parser for the specific header provided. + + max_count = 1 + + @classmethod + def parse(cls, value, kwds): + kwds['parse_tree'] = parse_tree = cls.value_parser(value) + kwds['decoded'] = str(parse_tree) + kwds['defects'].extend(parse_tree.all_defects) + if parse_tree.params is None: + kwds['params'] = {} + else: + # The MIME RFCs specify that parameter ordering is arbitrary. + kwds['params'] = {utils._sanitize(name).lower(): + utils._sanitize(value) + for name, value in parse_tree.params} + + def init(self, *args, **kw): + self._params = kw.pop('params') + super().init(*args, **kw) + + @property + def params(self): + return self._params.copy() + + +class ContentTypeHeader(ParameterizedMIMEHeader): + + value_parser = staticmethod(parser.parse_content_type_header) + + def init(self, *args, **kw): + super().init(*args, **kw) + self._maintype = utils._sanitize(self._parse_tree.maintype) + self._subtype = utils._sanitize(self._parse_tree.subtype) + + @property + def maintype(self): + return self._maintype + + @property + def subtype(self): + return self._subtype + + @property + def content_type(self): + return self.maintype + '/' + self.subtype + + +class ContentDispositionHeader(ParameterizedMIMEHeader): + + value_parser = staticmethod(parser.parse_content_disposition_header) + + def init(self, *args, **kw): + super().init(*args, **kw) + cd = self._parse_tree.content_disposition + self._content_disposition = cd if cd is None else utils._sanitize(cd) + + @property + def content_disposition(self): + return self._content_disposition + + +class ContentTransferEncodingHeader: + + max_count = 1 + + value_parser = staticmethod(parser.parse_content_transfer_encoding_header) + + @classmethod + def parse(cls, value, kwds): + kwds['parse_tree'] = parse_tree = cls.value_parser(value) + kwds['decoded'] = str(parse_tree) + kwds['defects'].extend(parse_tree.all_defects) + + def init(self, *args, **kw): + super().init(*args, **kw) + self._cte = utils._sanitize(self._parse_tree.cte) + + @property + def cte(self): + return self._cte + + # The header factory # _default_header_map = { - 'subject': UniqueUnstructuredHeader, - 'date': UniqueDateHeader, - 'resent-date': DateHeader, - 'orig-date': UniqueDateHeader, - 'sender': UniqueSingleAddressHeader, - 'resent-sender': SingleAddressHeader, - 'to': UniqueAddressHeader, - 'resent-to': AddressHeader, - 'cc': UniqueAddressHeader, - 'resent-cc': AddressHeader, - 'bcc': UniqueAddressHeader, - 'resent-bcc': AddressHeader, - 'from': UniqueAddressHeader, - 'resent-from': AddressHeader, - 'reply-to': UniqueAddressHeader, + 'subject': UniqueUnstructuredHeader, + 'date': UniqueDateHeader, + 'resent-date': DateHeader, + 'orig-date': UniqueDateHeader, + 'sender': UniqueSingleAddressHeader, + 'resent-sender': SingleAddressHeader, + 'to': UniqueAddressHeader, + 'resent-to': AddressHeader, + 'cc': UniqueAddressHeader, + 'resent-cc': AddressHeader, + 'bcc': UniqueAddressHeader, + 'resent-bcc': AddressHeader, + 'from': UniqueAddressHeader, + 'resent-from': AddressHeader, + 'reply-to': UniqueAddressHeader, + 'mime-version': MIMEVersionHeader, + 'content-type': ContentTypeHeader, + 'content-disposition': ContentDispositionHeader, + 'content-transfer-encoding': ContentTransferEncodingHeader, } class HeaderRegistry: diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py --- a/Lib/test/test_email/test__header_value_parser.py +++ b/Lib/test/test_email/test__header_value_parser.py @@ -3,7 +3,7 @@ from email import _header_value_parser as parser from email import errors from email import policy -from test.test_email import TestEmailBase +from test.test_email import TestEmailBase, parameterize class TestTokens(TestEmailBase): @@ -28,7 +28,32 @@ self.assertDefectsEqual(parts[2].all_defects, [errors.UndecodableBytesDefect]) -class TestParser(TestEmailBase): +class TestParserMixin: + + def _assert_results(self, tl, rest, string, value, defects, remainder, + comments=None): + self.assertEqual(str(tl), string) + self.assertEqual(tl.value, value) + self.assertDefectsEqual(tl.all_defects, defects) + self.assertEqual(rest, remainder) + if comments is not None: + self.assertEqual(tl.comments, comments) + + def _test_get_x(self, method, source, string, value, defects, + remainder, comments=None): + tl, rest = method(source) + self._assert_results(tl, rest, string, value, defects, remainder, + comments=None) + return tl + + def _test_parse_x(self, method, input, string, value, defects, + comments=None): + tl = method(input) + self._assert_results(tl, '', string, value, defects, '', comments) + return tl + + +class TestParser(TestParserMixin, TestEmailBase): # _wsp_splitter @@ -49,19 +74,6 @@ ['foo', ' \t ', 'def jik']) - # test harness - - def _test_get_x(self, method, input, string, value, defects, - remainder, comments=None): - token, rest = method(input) - self.assertEqual(str(token), string) - self.assertEqual(token.value, value) - self.assertDefectsEqual(token.all_defects, defects) - self.assertEqual(rest, remainder) - if comments is not None: - self.assertEqual(token.comments, comments) - return token - # get_fws def test_get_fws_only(self): @@ -2390,6 +2402,67 @@ str(address_list.mailboxes[2])) +@parameterize +class Test_parse_mime_version(TestParserMixin, TestEmailBase): + + def mime_version_as_value(self, + value, + tl_str, + tl_value, + major, + minor, + defects): + mime_version = self._test_parse_x(parser.parse_mime_version, + value, tl_str, tl_value, defects) + self.assertEqual(mime_version.major, major) + self.assertEqual(mime_version.minor, minor) + + mime_version_params = { + + 'rfc_2045_1': ( + '1.0', + '1.0', + '1.0', + 1, + 0, + []), + + 'RFC_2045_2': ( + '1.0 (produced by MetaSend Vx.x)', + '1.0 (produced by MetaSend Vx.x)', + '1.0 ', + 1, + 0, + []), + + 'RFC_2045_3': ( + '(produced by MetaSend Vx.x) 1.0', + '(produced by MetaSend Vx.x) 1.0', + ' 1.0', + 1, + 0, + []), + + 'RFC_2045_4': ( + '1.(produced by MetaSend Vx.x)0', + '1.(produced by MetaSend Vx.x)0', + '1. 0', + 1, + 0, + []), + + 'empty': ( + '', + '', + '', + None, + None, + [errors.HeaderMissingRequiredValue]), + + } + + + class TestFolding(TestEmailBase): policy = policy.default diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -259,6 +259,7 @@ self.assertTrue(lines[0].startswith('From ')) eq(text, NL.join(lines[1:])) + # test_headerregistry.TestContentTypeHeader.bad_params def test_bad_param(self): msg = email.message_from_string("Content-Type: blarg; baz; boo\n") self.assertEqual(msg.get_param('baz'), '') @@ -292,6 +293,7 @@ eq(msg.get_params(header='x-header'), [('foo', ''), ('bar', 'one'), ('baz', 'two')]) + # test_headerregistry.TestContentTypeHeader.spaces_around_param_equals def test_get_param_liberal(self): msg = Message() msg['Content-Type'] = 'Content-Type: Multipart/mixed; boundary = "CPIMSSMTPC06p5f3tG"' @@ -314,10 +316,12 @@ # msg.get_param("weird") # yet. + # test_headerregistry.TestContentTypeHeader.spaces_around_semis def test_get_param_funky_continuation_lines(self): msg = self._msgobj('msg_22.txt') self.assertEqual(msg.get_payload(1).get_param('name'), 'wibble.JPG') + # test_headerregistry.TestContentTypeHeader.semis_inside_quotes def test_get_param_with_semis_in_quotes(self): msg = email.message_from_string( 'Content-Type: image/pjpeg; name="Jim&&Jill"\n') @@ -325,6 +329,7 @@ self.assertEqual(msg.get_param('name', unquote=False), '"Jim&&Jill"') + # test_headerregistry.TestContentTypeHeader.quotes_inside_rfc2231_value def test_get_param_with_quotes(self): msg = email.message_from_string( 'Content-Type: foo; bar*0="baz\\"foobar"; bar*1="\\"baz"') @@ -1885,6 +1890,7 @@ "\nContent-Transfer-Encoding: {}".format(cte))) self.assertEqual(len(msg.defects), 0) + # test_headerregistry.TestContentTyopeHeader invalid_1 and invalid_2. def test_invalid_content_type(self): eq = self.assertEqual neq = self.ndiffAssertEqual @@ -3437,6 +3443,7 @@ self.assertEqual(msg.get_content_maintype(), "text") self.assertEqual(msg.get_content_subtype(), "pl\uFFFDin") + # test_headerregistry.TestContentTypeHeader.non_ascii_in_params def test_get_params_with_8bit(self): msg = email.message_from_bytes( 'X-Header: foo=\xa7ne; b\xa7r=two; baz=three\n'.encode('latin-1')) @@ -3446,6 +3453,7 @@ # XXX: someday you might be able to get 'b\xa7r', for now you can't. self.assertEqual(msg.get_param('b\xa7r', header='x-header'), None) + # test_headerregistry.TestContentTypeHeader.non_ascii_in_rfc2231_value def test_get_rfc2231_params_with_8bit(self): msg = email.message_from_bytes(textwrap.dedent("""\ Content-Type: text/plain; charset=us-ascii; @@ -4491,6 +4499,9 @@ # Test RFC 2231 header parameters (en/de)coding class TestRFC2231(TestEmailBase): + + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_with_double_quotes + # test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_inside_double_quotes def test_get_param(self): eq = self.assertEqual msg = self._msgobj('msg_29.txt') @@ -4576,11 +4587,15 @@ -Me """) + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_charset + # I changed the charset name, though, because the one in the file isn't + # a legal charset name. Should add a test for an illegal charset. def test_rfc2231_get_content_charset(self): eq = self.assertEqual msg = self._msgobj('msg_32.txt') eq(msg.get_content_charset(), 'us-ascii') + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_no_double_quotes def test_rfc2231_parse_rfc_quoting(self): m = textwrap.dedent('''\ Content-Disposition: inline; @@ -4594,6 +4609,7 @@ 'This is even more ***fun*** is it not.pdf') self.assertEqual(m, msg.as_string()) + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_with_double_quotes def test_rfc2231_parse_extra_quoting(self): m = textwrap.dedent('''\ Content-Disposition: inline; @@ -4607,6 +4623,9 @@ 'This is even more ***fun*** is it not.pdf') self.assertEqual(m, msg.as_string()) + # test_headerregistry.TestContentTypeHeader.rfc2231_no_language_or_charset + # but new test uses *0* because otherwise lang/charset is not valid. + # test_headerregistry.TestContentTypeHeader.rfc2231_segmented_normal_values def test_rfc2231_no_language_or_charset(self): m = '''\ Content-Transfer-Encoding: 8bit @@ -4621,6 +4640,7 @@ param, 'file____C__DOCUMENTS_20AND_20SETTINGS_FABIEN_LOCAL_20SETTINGS_TEMP_nsmail.htm') + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_no_charset def test_rfc2231_no_language_or_charset_in_filename(self): m = '''\ Content-Disposition: inline; @@ -4633,6 +4653,7 @@ self.assertEqual(msg.get_filename(), 'This is even more ***fun*** is it not.pdf') + # Duplicate of previous test? def test_rfc2231_no_language_or_charset_in_filename_encoded(self): m = '''\ Content-Disposition: inline; @@ -4645,6 +4666,8 @@ self.assertEqual(msg.get_filename(), 'This is even more ***fun*** is it not.pdf') + # test_headerregistry.TestContentTypeHeader.rfc2231_partly_encoded, + # but the test below is wrong (the first part should be decoded). def test_rfc2231_partly_encoded(self): m = '''\ Content-Disposition: inline; @@ -4696,6 +4719,7 @@ self.assertEqual(msg.get_content_charset(), 'this is even more ***fun*** is it not.pdf') + # test_headerregistry.TestContentTypeHeader.rfc2231_unknown_charset_treated_as_ascii def test_rfc2231_bad_encoding_in_filename(self): m = '''\ Content-Disposition: inline; @@ -4762,6 +4786,7 @@ eq(language, None) eq(s, "Frank's Document") + # test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_inside_double_quotes def test_rfc2231_single_tick_in_filename(self): m = """\ Content-Type: application/x-foo; name*0=\"Frank's\"; name*1=\" Document\" @@ -4772,6 +4797,7 @@ self.assertFalse(isinstance(param, tuple)) self.assertEqual(param, "Frank's Document") + # test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_in_value_with_charset_and_lang def test_rfc2231_tick_attack_extended(self): eq = self.assertEqual m = """\ @@ -4785,6 +4811,7 @@ eq(language, 'en-us') eq(s, "Frank's Document") + # test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_in_non_encoded_value def test_rfc2231_tick_attack(self): m = """\ Content-Type: application/x-foo; @@ -4796,6 +4823,7 @@ self.assertFalse(isinstance(param, tuple)) self.assertEqual(param, "us-ascii'en-us'Frank's Document") + # test_headerregistry.TestContentTypeHeader.rfc2231_single_quotes_inside_quotes def test_rfc2231_no_extended_values(self): eq = self.assertEqual m = """\ @@ -4805,6 +4833,7 @@ msg = email.message_from_string(m) eq(msg.get_param('name'), "Frank's Document") + # test_headerregistry.TestContentTypeHeader.rfc2231_encoded_then_unencoded_segments def test_rfc2231_encoded_then_unencoded_segments(self): eq = self.assertEqual m = """\ @@ -4820,6 +4849,8 @@ eq(language, 'en-us') eq(s, 'My Document For You') + # test_headerregistry.TestContentTypeHeader.rfc2231_unencoded_then_encoded_segments + # test_headerregistry.TestContentTypeHeader.rfc2231_quoted_unencoded_then_encoded_segments def test_rfc2231_unencoded_then_encoded_segments(self): eq = self.assertEqual m = """\ diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py --- a/Lib/test/test_email/test_headerregistry.py +++ b/Lib/test/test_email/test_headerregistry.py @@ -9,6 +9,9 @@ from email.headerregistry import Address, Group +DITTO = object() + + class TestHeaderRegistry(TestEmailBase): def test_arbitrary_name_unstructured(self): @@ -176,6 +179,789 @@ @parameterize +class TestContentTypeHeader(TestHeaderBase): + + def content_type_as_value(self, + source, + content_type, + maintype, + subtype, + *args): + l = len(args) + parmdict = args[0] if l>0 else {} + defects = args[1] if l>1 else [] + decoded = args[2] if l>2 and args[2] is not DITTO else source + header = 'Content-Type:' + ' ' if source else '' + folded = args[3] if l>3 else header + source + '\n' + h = self.make_header('Content-Type', source) + self.assertEqual(h.content_type, content_type) + self.assertEqual(h.maintype, maintype) + self.assertEqual(h.subtype, subtype) + self.assertEqual(h.params, parmdict) + self.assertDefectsEqual(h.defects, defects) + self.assertEqual(h, decoded) + self.assertEqual(h.fold(policy=policy.default), folded) + + content_type_params = { + + # Examples from RFC 2045. + + 'RFC_2045_1': ( + 'text/plain; charset=us-ascii (Plain text)', + 'text/plain', + 'text', + 'plain', + {'charset': 'us-ascii'}, + [], + 'text/plain; charset="us-ascii"'), + + 'RFC_2045_2': ( + 'text/plain; charset=us-ascii', + 'text/plain', + 'text', + 'plain', + {'charset': 'us-ascii'}, + [], + 'text/plain; charset="us-ascii"'), + + 'RFC_2045_3': ( + 'text/plain; charset="us-ascii"', + 'text/plain', + 'text', + 'plain', + {'charset': 'us-ascii'}), + + # RFC 2045 5.2 says syntactically invalid values are to be treated as + # text/plain. + + 'no_subtype_in_content_type': ( + 'text/', + 'text/plain', + 'text', + 'plain', + {}, + [errors.InvalidHeaderDefect]), + + 'no_slash_in_content_type': ( + 'foo', + 'text/plain', + 'text', + 'plain', + {}, + [errors.InvalidHeaderDefect]), + + 'junk_text_in_content_type': ( + '', + 'text/plain', + 'text', + 'plain', + {}, + [errors.InvalidHeaderDefect]), + + 'too_many_slashes_in_content_type': ( + 'image/jpeg/foo', + 'text/plain', + 'text', + 'plain', + {}, + [errors.InvalidHeaderDefect]), + + # But unknown names are OK. We could make non-IANA names a defect, but + # by not doing so we make ourselves future proof. The fact that they + # are unknown will be detectable by the fact that they don't appear in + # the mime_registry...and the application is free to extend that list + # to handle them even if the core library doesn't. + + 'unknown_content_type': ( + 'bad/names', + 'bad/names', + 'bad', + 'names'), + + # The content type is case insensitive, and CFWS is ignored. + + 'mixed_case_content_type': ( + 'ImAge/JPeg', + 'image/jpeg', + 'image', + 'jpeg'), + + 'spaces_in_content_type': ( + ' text / plain ', + 'text/plain', + 'text', + 'plain'), + + 'cfws_in_content_type': ( + '(foo) text (bar)/(baz)plain(stuff)', + 'text/plain', + 'text', + 'plain'), + + # test some parameters (more tests could be added for parameters + # associated with other content types, but since parameter parsing is + # generic they would be redundant for the current implementation). + + 'charset_param': ( + 'text/plain; charset="utf-8"', + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8'}), + + 'capitalized_charset': ( + 'text/plain; charset="US-ASCII"', + 'text/plain', + 'text', + 'plain', + {'charset': 'US-ASCII'}), + + 'unknown_charset': ( + 'text/plain; charset="fOo"', + 'text/plain', + 'text', + 'plain', + {'charset': 'fOo'}), + + 'capitalized_charset_param_name_and_comment': ( + 'text/plain; (interjection) Charset="utf-8"', + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8'}, + [], + # Should the parameter name be lowercased here? + 'text/plain; Charset="utf-8"'), + + # Since this is pretty much the ur-mimeheader, we'll put all the tests + # that exercise the parameter parsing and formatting here. + # + # XXX: question: is minimal quoting preferred? + + 'unquoted_param_value': ( + 'text/plain; title=foo', + 'text/plain', + 'text', + 'plain', + {'title': 'foo'}, + [], + 'text/plain; title="foo"'), + + 'param_value_with_tspecials': ( + 'text/plain; title="(bar)foo blue"', + 'text/plain', + 'text', + 'plain', + {'title': '(bar)foo blue'}), + + 'param_with_extra_quoted_whitespace': ( + 'text/plain; title=" a loong way \t home "', + 'text/plain', + 'text', + 'plain', + {'title': ' a loong way \t home '}), + + 'bad_params': ( + 'blarg; baz; boo', + 'text/plain', + 'text', + 'plain', + {'baz': '', 'boo': ''}, + [errors.InvalidHeaderDefect]*3), + + 'spaces_around_param_equals': ( + 'Multipart/mixed; boundary = "CPIMSSMTPC06p5f3tG"', + 'multipart/mixed', + 'multipart', + 'mixed', + {'boundary': 'CPIMSSMTPC06p5f3tG'}, + [], + 'Multipart/mixed; boundary="CPIMSSMTPC06p5f3tG"'), + + 'spaces_around_semis': ( + ('image/jpeg; name="wibble.JPG" ; x-mac-type="4A504547" ; ' + 'x-mac-creator="474B4F4E"'), + 'image/jpeg', + 'image', + 'jpeg', + {'name': 'wibble.JPG', + 'x-mac-type': '4A504547', + 'x-mac-creator': '474B4F4E'}, + [], + ('image/jpeg; name="wibble.JPG"; x-mac-type="4A504547"; ' + 'x-mac-creator="474B4F4E"'), + # XXX: it could be that we will eventually prefer to fold starting + # from the decoded value, in which case these spaces and similar + # spaces in other tests will be wrong. + ('Content-Type: image/jpeg; name="wibble.JPG" ; ' + 'x-mac-type="4A504547" ;\n' + ' x-mac-creator="474B4F4E"\n'), + ), + + 'semis_inside_quotes': ( + 'image/jpeg; name="Jim&&Jill"', + 'image/jpeg', + 'image', + 'jpeg', + {'name': 'Jim&&Jill'}), + + 'single_quotes_inside_quotes': ( + 'image/jpeg; name="Jim \'Bob\' Jill"', + 'image/jpeg', + 'image', + 'jpeg', + {'name': "Jim 'Bob' Jill"}), + + 'double_quotes_inside_quotes': ( + r'image/jpeg; name="Jim \"Bob\" Jill"', + 'image/jpeg', + 'image', + 'jpeg', + {'name': 'Jim "Bob" Jill'}, + [], + r'image/jpeg; name="Jim \"Bob\" Jill"'), + + # XXX: This test works except for the refolding of the header. I'll + # deal with that bug when I deal with the other folding bugs. + #'non_ascii_in_params': ( + # ('foo\xa7/bar; b\xa7r=two; ' + # 'baz=thr\xa7e'.encode('latin-1').decode('us-ascii', + # 'surrogateescape')), + # 'foo\uFFFD/bar', + # 'foo\uFFFD', + # 'bar', + # {'b\uFFFDr': 'two', 'baz': 'thr\uFFFDe'}, + # [errors.UndecodableBytesDefect]*3, + # 'foo�/bar; b�r="two"; baz="thr�e"', + # ), + + # RFC 2231 parameter tests. + + 'rfc2231_segmented_normal_values': ( + 'image/jpeg; name*0="abc"; name*1=".html"', + 'image/jpeg', + 'image', + 'jpeg', + {'name': "abc.html"}, + [], + 'image/jpeg; name="abc.html"'), + + 'quotes_inside_rfc2231_value': ( + r'image/jpeg; bar*0="baz\"foobar"; bar*1="\"baz"', + 'image/jpeg', + 'image', + 'jpeg', + {'bar': 'baz"foobar"baz'}, + [], + r'image/jpeg; bar="baz\"foobar\"baz"'), + + # XXX: This test works except for the refolding of the header. I'll + # deal with that bug when I deal with the other folding bugs. + #'non_ascii_rfc2231_value': ( + # ('text/plain; charset=us-ascii; ' + # "title*=us-ascii'en'This%20is%20" + # 'not%20f\xa7n').encode('latin-1').decode('us-ascii', + # 'surrogateescape'), + # 'text/plain', + # 'text', + # 'plain', + # {'charset': 'us-ascii', 'title': 'This is not f\uFFFDn'}, + # [errors.UndecodableBytesDefect], + # 'text/plain; charset="us-ascii"; title="This is not f�n"'), + + 'rfc2231_encoded_charset': ( + 'text/plain; charset*=ansi-x3.4-1968\'\'us-ascii', + 'text/plain', + 'text', + 'plain', + {'charset': 'us-ascii'}, + [], + 'text/plain; charset="us-ascii"'), + + # This follows the RFC: no double quotes around encoded values. + 'rfc2231_encoded_no_double_quotes': ( + ("text/plain;" + "\tname*0*=''This%20is%20;" + "\tname*1*=%2A%2A%2Afun%2A%2A%2A%20;" + '\tname*2="is it not.pdf"'), + 'text/plain', + 'text', + 'plain', + {'name': 'This is ***fun*** is it not.pdf'}, + [], + 'text/plain; name="This is ***fun*** is it not.pdf"', + ('Content-Type: text/plain;\tname*0*=\'\'This%20is%20;\n' + '\tname*1*=%2A%2A%2Afun%2A%2A%2A%20;\tname*2="is it not.pdf"\n'), + ), + + # Make sure we also handle it if there are spurrious double qoutes. + 'rfc2231_encoded_with_double_quotes': ( + ("text/plain;" + '\tname*0*="us-ascii\'\'This%20is%20even%20more%20";' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";' + '\tname*2="is it not.pdf"'), + 'text/plain', + 'text', + 'plain', + {'name': 'This is even more ***fun*** is it not.pdf'}, + [errors.InvalidHeaderDefect]*2, + 'text/plain; name="This is even more ***fun*** is it not.pdf"', + ('Content-Type: text/plain;\t' + 'name*0*="us-ascii\'\'This%20is%20even%20more%20";\n' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it not.pdf"\n'), + ), + + 'rfc2231_single_quote_inside_double_quotes': ( + ('text/plain; charset=us-ascii;' + '\ttitle*0*="us-ascii\'en\'This%20is%20really%20";' + '\ttitle*1*="%2A%2A%2Afun%2A%2A%2A%20";' + '\ttitle*2="isn\'t it!"'), + 'text/plain', + 'text', + 'plain', + {'charset': 'us-ascii', 'title': "This is really ***fun*** isn't it!"}, + [errors.InvalidHeaderDefect]*2, + ('text/plain; charset="us-ascii"; ' + 'title="This is really ***fun*** isn\'t it!"'), + ('Content-Type: text/plain; charset=us-ascii;\n' + '\ttitle*0*="us-ascii\'en\'This%20is%20really%20";\n' + '\ttitle*1*="%2A%2A%2Afun%2A%2A%2A%20";\ttitle*2="isn\'t it!"\n'), + ), + + 'rfc2231_single_quote_in_value_with_charset_and_lang': ( + ('application/x-foo;' + "\tname*0*=\"us-ascii'en-us'Frank's\"; name*1*=\" Document\""), + 'application/x-foo', + 'application', + 'x-foo', + {'name': "Frank's Document"}, + [errors.InvalidHeaderDefect]*2, + 'application/x-foo; name="Frank\'s Document"', + ('Content-Type: application/x-foo;\t' + 'name*0*="us-ascii\'en-us\'Frank\'s";\n' + ' name*1*=" Document"\n'), + ), + + 'rfc2231_single_quote_in_non_encoded_value': ( + ('application/x-foo;' + "\tname*0=\"us-ascii'en-us'Frank's\"; name*1=\" Document\""), + 'application/x-foo', + 'application', + 'x-foo', + {'name': "us-ascii'en-us'Frank's Document"}, + [], + 'application/x-foo; name="us-ascii\'en-us\'Frank\'s Document"', + ('Content-Type: application/x-foo;\t' + 'name*0="us-ascii\'en-us\'Frank\'s";\n' + ' name*1=" Document"\n'), + ), + + 'rfc2231_no_language_or_charset': ( + 'text/plain; NAME*0*=english_is_the_default.html', + 'text/plain', + 'text', + 'plain', + {'name': 'english_is_the_default.html'}, + [errors.InvalidHeaderDefect], + 'text/plain; NAME="english_is_the_default.html"'), + + 'rfc2231_encoded_no_charset': ( + ("text/plain;" + '\tname*0*="\'\'This%20is%20even%20more%20";' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";' + '\tname*2="is it.pdf"'), + 'text/plain', + 'text', + 'plain', + {'name': 'This is even more ***fun*** is it.pdf'}, + [errors.InvalidHeaderDefect]*2, + 'text/plain; name="This is even more ***fun*** is it.pdf"', + ('Content-Type: text/plain;\t' + 'name*0*="\'\'This%20is%20even%20more%20";\n' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'), + ), + + # XXX: see below...the first name line here should be *0 not *0*. + 'rfc2231_partly_encoded': ( + ("text/plain;" + '\tname*0*="\'\'This%20is%20even%20more%20";' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";' + '\tname*2="is it.pdf"'), + 'text/plain', + 'text', + 'plain', + {'name': 'This is even more ***fun*** is it.pdf'}, + [errors.InvalidHeaderDefect]*2, + 'text/plain; name="This is even more ***fun*** is it.pdf"', + ('Content-Type: text/plain;\t' + 'name*0*="\'\'This%20is%20even%20more%20";\n' + '\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'), + ), + + 'rfc2231_partly_encoded_2': ( + ("text/plain;" + '\tname*0*="\'\'This%20is%20even%20more%20";' + '\tname*1="%2A%2A%2Afun%2A%2A%2A%20";' + '\tname*2="is it.pdf"'), + 'text/plain', + 'text', + 'plain', + {'name': 'This is even more %2A%2A%2Afun%2A%2A%2A%20is it.pdf'}, + [errors.InvalidHeaderDefect], + 'text/plain; name="This is even more %2A%2A%2Afun%2A%2A%2A%20is it.pdf"', + ('Content-Type: text/plain;\t' + 'name*0*="\'\'This%20is%20even%20more%20";\n' + '\tname*1="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'), + ), + + 'rfc2231_unknown_charset_treated_as_ascii': ( + "text/plain; name*0*=bogus'xx'ascii_is_the_default", + 'text/plain', + 'text', + 'plain', + {'name': 'ascii_is_the_default'}, + [], + 'text/plain; name="ascii_is_the_default"'), + + 'rfc2231_bad_character_in_charset_parameter_value': ( + "text/plain; charset*=ascii''utf-8%E2%80%9D", + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8\uFFFD\uFFFD\uFFFD'}, + [errors.UndecodableBytesDefect], + 'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'), + + 'rfc2231_encoded_then_unencoded_segments': ( + ('application/x-foo;' + '\tname*0*="us-ascii\'en-us\'My";' + '\tname*1=" Document";' + '\tname*2=" For You"'), + 'application/x-foo', + 'application', + 'x-foo', + {'name': 'My Document For You'}, + [errors.InvalidHeaderDefect], + 'application/x-foo; name="My Document For You"', + ('Content-Type: application/x-foo;\t' + 'name*0*="us-ascii\'en-us\'My";\n' + '\tname*1=" Document";\tname*2=" For You"\n'), + ), + + # My reading of the RFC is that this is an invalid header. The RFC + # says that if charset and language information is given, the first + # segment *must* be encoded. + 'rfc2231_unencoded_then_encoded_segments': ( + ('application/x-foo;' + '\tname*0=us-ascii\'en-us\'My;' + '\tname*1*=" Document";' + '\tname*2*=" For You"'), + 'application/x-foo', + 'application', + 'x-foo', + {'name': 'My Document For You'}, + [errors.InvalidHeaderDefect]*3, + 'application/x-foo; name="My Document For You"', + ("Content-Type: application/x-foo;\tname*0=us-ascii'en-us'My;\t" + # XXX: the newline is in the wrong place, come back and fix + # this when the rest of tests pass. + 'name*1*=" Document"\n;' + '\tname*2*=" For You"\n'), + ), + + # XXX: I would say this one should default to ascii/en for the + # "encoded" segment, since the the first segment is not encoded and is + # in double quotes, making the value a valid non-encoded string. The + # old parser decodes this just like the previous case, which may be the + # better Postel rule, but could equally result in borking headers that + # intentially have quoted quotes in them. We could get this 98% right + # if we treat it as a quoted string *unless* it matches the + # charset'lang'value pattern exactly *and* there is at least one + # encoded segment. Implementing that algorithm will require some + # refactoring, so I haven't done it (yet). + + 'rfc2231_qouted_unencoded_then_encoded_segments': ( + ('application/x-foo;' + '\tname*0="us-ascii\'en-us\'My";' + '\tname*1*=" Document";' + '\tname*2*=" For You"'), + 'application/x-foo', + 'application', + 'x-foo', + {'name': "us-ascii'en-us'My Document For You"}, + [errors.InvalidHeaderDefect]*2, + 'application/x-foo; name="us-ascii\'en-us\'My Document For You"', + ('Content-Type: application/x-foo;\t' + 'name*0="us-ascii\'en-us\'My";\n' + '\tname*1*=" Document";\tname*2*=" For You"\n'), + ), + + } + + +@parameterize +class TestContentTransferEncoding(TestHeaderBase): + + def cte_as_value(self, + source, + cte, + *args): + l = len(args) + defects = args[0] if l>0 else [] + decoded = args[1] if l>1 and args[1] is not DITTO else source + header = 'Content-Transfer-Encoding:' + ' ' if source else '' + folded = args[2] if l>2 else header + source + '\n' + h = self.make_header('Content-Transfer-Encoding', source) + self.assertEqual(h.cte, cte) + self.assertDefectsEqual(h.defects, defects) + self.assertEqual(h, decoded) + self.assertEqual(h.fold(policy=policy.default), folded) + + cte_params = { + + 'RFC_2183_1': ( + 'base64', + 'base64',), + + 'no_value': ( + '', + '7bit', + [errors.HeaderMissingRequiredValue], + '', + 'Content-Transfer-Encoding:\n', + ), + + 'junk_after_cte': ( + '7bit and a bunch more', + '7bit', + [errors.InvalidHeaderDefect]), + + } + + +@parameterize +class TestContentDisposition(TestHeaderBase): + + def content_disp_as_value(self, + source, + content_disposition, + *args): + l = len(args) + parmdict = args[0] if l>0 else {} + defects = args[1] if l>1 else [] + decoded = args[2] if l>2 and args[2] is not DITTO else source + header = 'Content-Disposition:' + ' ' if source else '' + folded = args[3] if l>3 else header + source + '\n' + h = self.make_header('Content-Disposition', source) + self.assertEqual(h.content_disposition, content_disposition) + self.assertEqual(h.params, parmdict) + self.assertDefectsEqual(h.defects, defects) + self.assertEqual(h, decoded) + self.assertEqual(h.fold(policy=policy.default), folded) + + content_disp_params = { + + # Examples from RFC 2183. + + 'RFC_2183_1': ( + 'inline', + 'inline',), + + 'RFC_2183_2': ( + ('attachment; filename=genome.jpeg;' + ' modification-date="Wed, 12 Feb 1997 16:29:51 -0500";'), + 'attachment', + {'filename': 'genome.jpeg', + 'modification-date': 'Wed, 12 Feb 1997 16:29:51 -0500'}, + [], + ('attachment; filename="genome.jpeg"; ' + 'modification-date="Wed, 12 Feb 1997 16:29:51 -0500"'), + ('Content-Disposition: attachment; filename=genome.jpeg;\n' + ' modification-date="Wed, 12 Feb 1997 16:29:51 -0500";\n'), + ), + + 'no_value': ( + '', + None, + {}, + [errors.HeaderMissingRequiredValue], + '', + 'Content-Disposition:\n'), + + 'invalid_value': ( + 'ab./k', + 'ab.', + {}, + [errors.InvalidHeaderDefect]), + + 'invalid_value_with_params': ( + 'ab./k; filename="foo"', + 'ab.', + {'filename': 'foo'}, + [errors.InvalidHeaderDefect]), + + } + + +@parameterize +class TestMIMEVersionHeader(TestHeaderBase): + + def version_string_as_MIME_Version(self, + source, + decoded, + version, + major, + minor, + defects): + h = self.make_header('MIME-Version', source) + self.assertEqual(h, decoded) + self.assertEqual(h.version, version) + self.assertEqual(h.major, major) + self.assertEqual(h.minor, minor) + self.assertDefectsEqual(h.defects, defects) + if source: + source = ' ' + source + self.assertEqual(h.fold(policy=policy.default), + 'MIME-Version:' + source + '\n') + + version_string_params = { + + # Examples from the RFC. + + 'RFC_2045_1': ( + '1.0', + '1.0', + '1.0', + 1, + 0, + []), + + 'RFC_2045_2': ( + '1.0 (produced by MetaSend Vx.x)', + '1.0 (produced by MetaSend Vx.x)', + '1.0', + 1, + 0, + []), + + 'RFC_2045_3': ( + '(produced by MetaSend Vx.x) 1.0', + '(produced by MetaSend Vx.x) 1.0', + '1.0', + 1, + 0, + []), + + 'RFC_2045_4': ( + '1.(produced by MetaSend Vx.x)0', + '1.(produced by MetaSend Vx.x)0', + '1.0', + 1, + 0, + []), + + # Other valid values. + + '1_1': ( + '1.1', + '1.1', + '1.1', + 1, + 1, + []), + + '2_1': ( + '2.1', + '2.1', + '2.1', + 2, + 1, + []), + + 'whitespace': ( + '1 .0', + '1 .0', + '1.0', + 1, + 0, + []), + + 'leading_trailing_whitespace_ignored': ( + ' 1.0 ', + ' 1.0 ', + '1.0', + 1, + 0, + []), + + # Recoverable invalid values. We can recover here only because we + # already have a valid value by the time we encounter the garbage. + # Anywhere else, and we don't know where the garbage ends. + + 'non_comment_garbage_after': ( + '1.0 ', + '1.0 ', + '1.0', + 1, + 0, + [errors.InvalidHeaderDefect]), + + # Unrecoverable invalid values. We *could* apply more heuristics to + # get someing out of the first two, but doing so is not worth the + # effort. + + 'non_comment_garbage_before': ( + ' 1.0', + ' 1.0', + None, + None, + None, + [errors.InvalidHeaderDefect]), + + 'non_comment_garbage_inside': ( + '1.0', + '1.0', + None, + None, + None, + [errors.InvalidHeaderDefect]), + + 'two_periods': ( + '1..0', + '1..0', + None, + None, + None, + [errors.InvalidHeaderDefect]), + + '2_x': ( + '2.x', + '2.x', + None, # This could be 2, but it seems safer to make it None. + None, + None, + [errors.InvalidHeaderDefect]), + + 'foo': ( + 'foo', + 'foo', + None, + None, + None, + [errors.InvalidHeaderDefect]), + + 'missing': ( + '', + '', + None, + None, + None, + [errors.HeaderMissingRequiredValue]), + + } + + +@parameterize class TestAddressHeader(TestHeaderBase): example_params = {