mht_file = b"""From: "Opgeslagen met Windows Internet Explorer 9" Subject: Window title Date: Sun, 12 Aug 2012 12:09:06 +0200 MIME-Version: 1.0 Content-Type: multipart/related; type="text/html"; boundary="----=_NextPart_000_0000_01CD7883.457A3C20" X-MimeOLE: Produced By Microsoft MimeOLE V6.0.6002.18463 This is a multi-part message in MIME format. ------=_NextPart_000_0000_01CD7883.457A3C20 Content-Type: text/html; charset="Windows-1252" Content-Transfer-Encoding: quoted-printable Content-Location: file://C:\Users\admin\Documents\testmht.html Window title

Title

Some text

=20
------=_NextPart_000_0000_01CD7883.457A3C20 Content-Type: text/css; charset="iso-8859-1" Content-Transfer-Encoding: 7bit Content-Location: file:///C:/Users/admin/Documents/csspatrick.css H1 { COLOR: green } ------=_NextPart_000_0000_01CD7883.457A3C20 Content-Type: application/octet-stream Content-Transfer-Encoding: 7bit Content-Location: file:///C:/Users/admin/Documents/libpatrick.js function shout(s) { alert(s); } ------=_NextPart_000_0000_01CD7883.457A3C20-- """ headers = { 'origin': 'http://localhost:10081', 'referer': 'http://localhost:10081/scenario3', 'content-length': '1844', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'cache-control': 'max-age=0', 'connection': 'keep-alive', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'user-agent': 'Mozilla/5.0 (Windows NT 6.0) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.75 Safari/537.1', 'host': 'localhost:10081', 'accept-encoding': 'gzip,deflate,sdch', 'accept-language': 'nl-NL,nl;q=0.8,en-US;q=0.6,en;q=0.4,en-GB;q=0.2', 'content-type': 'multipart/form-data; boundary=----WebKitFormBoundary8ppHA8NvREcSWWW4', } body = b"""------WebKitFormBoundary8ppHA8NvREcSWWW4 Content-Disposition: form-data; name="data"; filename="ie9_saved.mht" Content-Type: multipart/related """ + mht_file + b""" ------WebKitFormBoundary8ppHA8NvREcSWWW4 Content-Disposition: form-data; name="fieldname" abc123 ------WebKitFormBoundary8ppHA8NvREcSWWW4-- """ import io, os, traceback, imp, cgi imp.reload(cgi) import email.parser class FieldStorage(cgi.FieldStorage): def __repr__(self): result = [("%s(" % self.__class__.__name__), ("name = %r," % self.name), ("filename = %r," % self.filename)] if isinstance(self.value, list): result.extend(["value = ["] + [("%r," % item) for item in self.value] + ["]"]) else: result.append("value = %r" % self.value) result.append(")") return "\n ".join(result) class PatchedFieldStorage(FieldStorage): def read_multi(self, environ, keep_blank_values, strict_parsing): """Internal: read a part that is itself multipart.""" self.list = [] if self.qs_on_post: query = urllib.parse.parse_qsl( self.qs_on_post, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors) for key, value in query: self.list.append(MiniFieldStorage(key, value)) FieldStorageClass = None # pv not self! klass = self.FieldStorageClass or self.__class__ ib = self.innerboundary if ib: if not valid_boundary(ib): raise ValueError('Invalid boundary in multipart form: %r' % (ib,)) line = self.fp.readline() # bytes if not isinstance(line, bytes): raise ValueError("%s should return bytes, got %s" % (self.fp, type(line).__name__)) self.bytes_read += len(line) # first line holds boundary ; ignore it, or check that # b"--" + ib == line.strip() ? while True: parser = FeedParser() hdr_text = b"" while True: data = self.fp.readline() hdr_text += data if not data.strip(): break if not hdr_text: break # parser takes strings, not bytes self.bytes_read += len(hdr_text) parser.feed(hdr_text.decode(self.encoding, self.errors)) headers = parser.close() part = klass(self.fp, headers, ib, environ, keep_blank_values, strict_parsing, self.limit-self.bytes_read, self.encoding, self.errors) self.bytes_read += part.bytes_read self.list.append(part) if self.bytes_read >= self.limit: # pv was self.length break self.skip_lines() else: self.read_single() # self.file is BytesIO or temporary filecontaining the data of 'filename' # len(self.file) == bytes_read_after - bytes_read_before - len(self.outerboundary) - 4 new_env = dict(REQUEST_METHOD = environ['REQUEST_METHOD']) # header headerlist = [] for line in self.file: if not line.strip(): break headerlist.append(line) msg = email.parser.BytesParser().parsebytes(b"".join(headerlist), headersonly = True) ctype, pdict = cgi.parse_header(msg.get('Content-Type')) ib = pdict['boundary'].encode(self.encoding) # prologue for line in self.file: if line == b"--" + ib + b"\n": break # body bodylist = [line] + [line for line in self.file] bodybytes = b"".join(bodylist) bodyfile = io.BytesIO(bodybytes) bodyfile.seek(0) part = klass(fp = bodyfile, headers=msg, limit=len(bodybytes), environ=new_env, outerboundary=self.outerboundary, keep_blank_values=0, strict_parsing=0, encoding='utf-8', errors='replace' ) self.list = [part] self.file = None return class ExperimentalFieldStorage(FieldStorage): def read_multi(self, environ, keep_blank_values, strict_parsing): """Internal: read a part that is itself multipart.""" self.list = [] if self.qs_on_post: query = urllib.parse.parse_qsl( self.qs_on_post, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors) for key, value in query: self.list.append(MiniFieldStorage(key, value)) FieldStorageClass = None # pv not self! klass = self.FieldStorageClass or self.__class__ ib = self.innerboundary if ib: if not valid_boundary(ib): raise ValueError('Invalid boundary in multipart form: %r' % (ib,)) line = self.fp.readline() # bytes if not isinstance(line, bytes): raise ValueError("%s should return bytes, got %s" % (self.fp, type(line).__name__)) self.bytes_read += len(line) # first line holds boundary ; ignore it, or check that # b"--" + ib == line.strip() ? else: ib = self.outerboundary assert self.limit >= 0 # next line holds first header while True: parser = FeedParser() hdr_text = b"" while True: data = self.fp.readline() hdr_text += data if not data.strip(): break if not hdr_text: break # parser takes strings, not bytes # pv switch to BytesParser self.bytes_read += len(hdr_text) parser.feed(hdr_text.decode(self.encoding, self.errors)) headers = parser.close() P = self.fp.tell() print(80*"-") print(P, self.bytes_read) for i, line in enumerate(self.fp.readlines()): print(repr(line)) if i > 4: break self.fp.seek(P) part = klass(fp=self.fp, headers=headers, outerboundary=ib, environ=environ, keep_blank_values=keep_blank_values, strict_parsing=strict_parsing, limit=self.limit-self.bytes_read, encoding=self.encoding, errors=self.errors) self.bytes_read += part.bytes_read self.list.append(part) if self.bytes_read >= self.limit: # pv was self.length break self.skip_lines() environ = os.environ environ.update(REQUEST_METHOD = "POST") def test(body, cls = FieldStorage): print() print() try: fs = cls(io.BytesIO(body), headers) print(fs) except Exception as X: traceback.print_exc() # this triggers the exception: test(body) # runs correct, the file is opaque: test(body.replace(b"multipart/related", b"application/octet-stream", 1)) # this is patched: test(body, PatchedFieldStorage) test(body.replace(b"multipart/related", b"application/octet-stream", 1), PatchedFieldStorage) # this is experimental and not working: test(body, ExperimentalFieldStorage) test(body.replace(b"multipart/related", b"application/octet-stream", 1), ExperimentalFieldStorage)