diff -r ba699cf9bdbb Lib/test/test_urllib2_localnet.py --- a/Lib/test/test_urllib2_localnet.py Mon Apr 11 03:45:25 2011 +0300 +++ b/Lib/test/test_urllib2_localnet.py Mon Apr 11 11:29:02 2011 -0700 @@ -75,6 +75,7 @@ self._users = {} self._realm_name = "Test Realm" self._qop = "auth" + self._auth_header = "Proxy-Authorization" def set_qop(self, qop): self._qop = qop @@ -154,11 +155,11 @@ if len(self._users) == 0: return True - if 'Proxy-Authorization' not in request_handler.headers: + if self._auth_header not in request_handler.headers: return self._return_auth_challenge(request_handler) else: auth_dict = self._create_auth_dict( - request_handler.headers['Proxy-Authorization'] + request_handler.headers[self._auth_header] ) if auth_dict["username"] in self._users: password = self._users[ auth_dict["username"] ] @@ -186,6 +187,49 @@ return self._return_auth_challenge(request_handler) return True +class DigestClientMD5sessAuthHandler(DigestAuthHandler): + """Handler for performing digest authentication.""" + + def __init__(self, algorithm='MD5-sess'): + DigestAuthHandler.__init__(self) + self._algorithm = algorithm + self._auth_header = "Authorization" + + def _validate_auth(self, auth_dict, password, method, uri): + final_dict = {} + final_dict.update(auth_dict) + final_dict["password"] = password + final_dict["method"] = method + final_dict["uri"] = uri + pre_HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict + pre_HA1 = hashlib.md5(pre_HA1_str).hexdigest() + HA1_str = "%s:%s:%s" % (pre_HA1, final_dict["nonce"], final_dict["cnonce"]) + HA1 = hashlib.md5(HA1_str).hexdigest() + HA2_str = "%(method)s:%(uri)s" % final_dict + HA2 = hashlib.md5(HA2_str).hexdigest() + final_dict["HA1"] = HA1 + final_dict["HA2"] = HA2 + response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ + "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict + response = hashlib.md5(response_str).hexdigest() + + return response == auth_dict["response"] + + def _return_auth_challenge(self, request_handler): + request_handler.send_response(401, "Authentication Required") + request_handler.send_header("Content-Type", "text/html") + request_handler.send_header( + 'WWW-Authenticate', 'Digest realm="%s", ' + 'qop="%s", algorithm=%s, ' + 'nonce="%s", ' % \ + (self._realm_name, self._qop, self._algorithm, self._generate_nonce())) + # XXX: Not sure if we're supposed to add this next header or + # not. + #request_handler.send_header('Connection', 'close') + request_handler.end_headers() + request_handler.wfile.write("Authentication Required.") + return False + # Proxy test infrastructure class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): @@ -294,6 +338,72 @@ pass result.close() +class DigestMD5sessClientAuthTests(BaseTestCase): + URL = "http://localhost" + + USER = "tester" + PASSWD = "test123" + REALM = "TestRealm" + + def setUp(self): + self.server_digest_auth_handler = DigestClientMD5sessAuthHandler() + self.server_digest_auth_handler.set_users({self.USER: self.PASSWD}) + self.server_digest_auth_handler.set_realm(self.REALM) + def create_fake_proxy_handler(*args, **kwargs): + return FakeProxyHandler(self.server_digest_auth_handler, *args, **kwargs) + + self.server = LoopbackHttpServerThread(create_fake_proxy_handler) + self.server.start() + self.server.ready.wait() + proxy_url = "http://127.0.0.1:%d" % self.server.port + handler = urllib2.ProxyHandler({"http" : proxy_url}) + self.client_digest_handler = urllib2.HTTPDigestAuthHandler() + self.opener = urllib2.build_opener(handler, self.client_digest_handler) + + def tearDown(self): + self.server.stop() + + def test_client_md5_sess_qop_auth_works(self): + self.client_digest_handler.add_password(self.REALM, self.URL, + self.USER, self.PASSWD) + self.server_digest_auth_handler.set_qop("auth") + result = self.opener.open(self.URL) + while result.read(): + pass + result.close() + +class UnknownClientAuthTests(BaseTestCase): + URL = "http://localhost" + + USER = "tester" + PASSWD = "test123" + REALM = "TestRealm" + + def setUp(self): + self.server_digest_auth_handler = \ + DigestClientMD5sessAuthHandler(algorithm='Unknown') + self.server_digest_auth_handler.set_users({self.USER: self.PASSWD}) + self.server_digest_auth_handler.set_realm(self.REALM) + def create_fake_proxy_handler(*args, **kwargs): + return FakeProxyHandler(self.server_digest_auth_handler, *args, **kwargs) + + self.server = LoopbackHttpServerThread(create_fake_proxy_handler) + self.server.start() + self.server.ready.wait() + proxy_url = "http://127.0.0.1:%d" % self.server.port + handler = urllib2.ProxyHandler({"http" : proxy_url}) + self.client_digest_handler = urllib2.HTTPDigestAuthHandler() + self.opener = urllib2.build_opener(handler, self.client_digest_handler) + + def tearDown(self): + self.server.stop() + + def test_client_unknown_auth(self): + self.client_digest_handler.add_password(self.REALM, self.URL, + self.USER, self.PASSWD) + self.server_digest_auth_handler.set_qop("auth") + with self.assertRaises(ValueError): + result = self.opener.open(self.URL) def GetRequestHandler(responses): @@ -533,7 +643,10 @@ # the next line. #test_support.requires("network") - test_support.run_unittest(ProxyAuthTests, TestUrlopen) + test_support.run_unittest(ProxyAuthTests, + DigestMD5sessClientAuthTests, + UnknownClientAuthTests, + TestUrlopen) if __name__ == "__main__": test_main() diff -r ba699cf9bdbb Lib/urllib2.py --- a/Lib/urllib2.py Mon Apr 11 03:45:25 2011 +0300 +++ b/Lib/urllib2.py Mon Apr 11 11:29:02 2011 -0700 @@ -932,6 +932,7 @@ self.retried = 0 self.nonce_count = 0 self.last_nonce = None + self.last_cnonce = None # only used when algorithm="MD5-sess" def reset_retry_count(self): self.retried = 0 @@ -1001,10 +1002,27 @@ else: entdig = None - A1 = "%s:%s:%s" % (user, realm, pw) - A2 = "%s:%s" % (req.get_method(), - # XXX selector: what about proxies and full urls - req.get_selector()) + split_result = urlparse.urlsplit(req.get_full_url()) + if algorithm == "MD5-sess" or not split_result.query: + # XXX IIS6 expects a URL without query string + uri = split_result.path + else: + uri = "%s?%s" % (split_result.path, split_result.query) + + cnonce = None + if algorithm == "MD5-sess": + # The cnonce is needed to calculate A1, and we need to reuse + # last cnonce (according to RFC2617) + if nonce != self.last_nonce: + # calculate a new one + self.last_cnonce = self.get_cnonce(nonce) + cnonce = self.last_cnonce + + A1 = "%s:%s:%s" % (H("%s:%s:%s" % (user, realm, pw)), nonce, cnonce) + else: + A1 = "%s:%s:%s" % (user, realm, pw) + + A2 = "%s:%s" % (req.get_method(), uri) if qop == 'auth': if nonce == self.last_nonce: self.nonce_count += 1 @@ -1013,7 +1031,8 @@ self.last_nonce = nonce ncvalue = '%08x' % self.nonce_count - cnonce = self.get_cnonce(nonce) + if not cnonce: + cnonce = self.get_cnonce(nonce) noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, H(A2)) respdig = KD(H(A1), noncebit) elif qop is None: @@ -1025,8 +1044,7 @@ # XXX should the partial digests be encoded too? base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ - 'response="%s"' % (user, realm, nonce, req.get_selector(), - respdig) + 'response="%s"' % (user, realm, nonce, uri, respdig) if opaque: base += ', opaque="%s"' % opaque if entdig: @@ -1040,10 +1058,13 @@ # algorithm should be case-insensitive according to RFC2617 algorithm = algorithm.upper() # lambdas assume digest modules are imported at the top level - if algorithm == 'MD5': + if algorithm in ('MD5', 'MD5-SESS'): H = lambda x: hashlib.md5(x).hexdigest() elif algorithm == 'SHA': H = lambda x: hashlib.sha1(x).hexdigest() + else: + raise ValueError, "Unknown digest authentication algorithm '%s'" % \ + algorithm # XXX MD5-sess KD = lambda s, d: H("%s:%s" % (s, d)) return H, KD