import sys from threading import Thread from mmap import mmap if sys.version_info >= (3,): import urllib.request as urllib2 import http.server as BaseHTTPServer import http.client as httplib else: import urllib2 import BaseHTTPServer import httplib class Handler(BaseHTTPServer.BaseHTTPRequestHandler): def do_POST(self): print(self.headers) if "Authorization" not in self.headers: self.send_response(401, "Need authorization") self.send_header("WWW-Authenticate", 'Basic realm=""') self.end_headers() return encoding = self.headers.get("Transfer-Encoding") if encoding is None: length = int(self.headers.get("Content-Length")) else: assert encoding == "chunked" length = int(self.rfile.readline(), 16) request = self.rfile.read(length) assert len(request) == length if length > 0: self.send_response(200, "Okay") else: self.send_response(400, "Empty request data") self.end_headers() self.wfile.write(("%d bytes" % length).encode("ascii")) server = BaseHTTPServer.HTTPServer(("127.0.0.1", 0), Handler) thread = Thread(target=server.serve_forever) thread.start() try: url = "http://%s:%d/" % server.server_address mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() mgr.add_password(None, url, "user", "password") opener = urllib2.build_opener(urllib2.HTTPBasicAuthHandler(mgr)) data = "field=value".encode("ascii") mem = mmap(-1, len(data)) mem[:] = data if sys.version_info >= (2, 6): timeout = (3,) else: timeout = () response = opener.open(url, mem, *timeout) try: print(response.read().decode("ascii")) finally: response.close() finally: server.shutdown() thread.join()