Index: Doc/lib/liburllib2.tex =================================================================== --- Doc/lib/liburllib2.tex (revision 45407) +++ Doc/lib/liburllib2.tex (working copy) @@ -621,14 +621,20 @@ \subsection{AbstractBasicAuthHandler Objects \label{abstract-basic-auth-handler}} -\begin{methoddesc}[AbstractBasicAuthHandler]{handle_authentication_request} +\begin{methoddesc}[AbstractBasicAuthHandler]{http_error_auth_reqed} {authreq, host, req, headers} Handle an authentication request by getting a user/password pair, and re-trying the request. \var{authreq} should be the name of the header where the information about the realm is included in the request, -\var{host} is the host to authenticate to, \var{req} should be the -(failed) \class{Request} object, and \var{headers} should be the error -headers. +\var{host} specifies the URL and path to authenticate for, \var{req} +should be the (failed) \class{Request} object, and \var{headers} +should be the error headers. + +\var{host} is either an authority (e.g. \code{"python.org"}) or a URL +containing an authority component (e.g. \code{"http://python.org/"}). +In either case, the authority must not contain a userinfo component +(so, \code{"python.org"} and \code{"python.org:80"} are fine, +\code{"joe:password@python.org"} is not). \end{methoddesc} @@ -653,7 +659,7 @@ \subsection{AbstractDigestAuthHandler Objects \label{abstract-digest-auth-handler}} -\begin{methoddesc}[AbstractDigestAuthHandler]{handle_authentication_request} +\begin{methoddesc}[AbstractDigestAuthHandler]{http_error_auth_reqed} {authreq, host, req, headers} \var{authreq} should be the name of the header where the information about the realm is included in the request, \var{host} should be the host to Index: Lib/urllib2.py =================================================================== --- Lib/urllib2.py (revision 45407) +++ Lib/urllib2.py (working copy) @@ -612,7 +612,6 @@ ('http', 'joe', 'password', 'proxy.example.com') """ - from urlparse import _splitnetloc scheme, r_scheme = splittype(proxy) if not r_scheme.startswith("/"): # authority @@ -673,6 +672,7 @@ return self.parent.open(req) class HTTPPasswordMgr: + def __init__(self): self.passwd = {} @@ -696,10 +696,15 @@ def reduce_uri(self, uri): """Accept netloc or URI and extract only the netloc and path""" - parts = urlparse.urlparse(uri) + parts = urlparse.urlsplit(uri) if parts[1]: + # URI return parts[1], parts[2] or '/' + elif parts[0]: + # host:port + return uri, '/' else: + # host return parts[2], '/' def is_suburi(self, base, test): @@ -742,6 +747,8 @@ self.add_password = self.passwd.add_password def http_error_auth_reqed(self, authreq, host, req, headers): + # host may be an authority (without userinfo) or a URL with an + # authority # XXX could be multiple headers authreq = headers.get(authreq, None) if authreq: @@ -752,10 +759,7 @@ return self.retry_http_basic_auth(host, req, realm) def retry_http_basic_auth(self, host, req, realm): - # TODO(jhylton): Remove the host argument? It depends on whether - # retry_http_basic_auth() is consider part of the public API. - # It probably is. - user, pw = self.passwd.find_user_password(realm, req.get_full_url()) + user, pw = self.passwd.find_user_password(realm, host) if pw is not None: raw = "%s:%s" % (user, pw) auth = 'Basic %s' % base64.encodestring(raw).strip() @@ -766,14 +770,15 @@ else: return None + class HTTPBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler): auth_header = 'Authorization' def http_error_401(self, req, fp, code, msg, headers): - host = urlparse.urlparse(req.get_full_url())[1] + url = req.get_full_url() return self.http_error_auth_reqed('www-authenticate', - host, req, headers) + url, req, headers) class ProxyBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler): @@ -781,9 +786,13 @@ auth_header = 'Proxy-authorization' def http_error_407(self, req, fp, code, msg, headers): - host = req.get_host() + # http_error_auth_reqed requires that there is no userinfo component in + # authority. Assume there isn't one, since urllib2 does not (and + # should not, RFC 3986 s. 3.2.1) support requests for URLs containing + # userinfo. + authority = req.get_host() return self.http_error_auth_reqed('proxy-authenticate', - host, req, headers) + authority, req, headers) def randombytes(n): Index: Lib/test/test_urllib2.py =================================================================== --- Lib/test/test_urllib2.py (revision 45407) +++ Lib/test/test_urllib2.py (working copy) @@ -10,10 +10,7 @@ # XXX # Request # CacheFTPHandler (hard to write) -# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter -# and Greg Stein, since they're doing Digest Authentication) -# Authentication stuff (ditto) -# CustomProxy, CustomProxyHandler +# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler class TrivialTests(unittest.TestCase): def test_trivial(self): @@ -49,6 +46,70 @@ self.assertEquals(urllib2.parse_http_list(string), list) +def test_password_manager(self): + """ + >>> mgr = urllib2.HTTPPasswordMgr() + >>> add = mgr.add_password + >>> add("Some Realm", "http://example.com/", "joe", "password") + >>> add("Some Realm", "http://example.com/ni", "ni", "ni") + >>> add("c", "http://example.com/foo", "foo", "ni") + >>> add("c", "http://example.com/bar", "bar", "nini") + >>> add("b", "http://example.com/", "first", "blah") + >>> add("b", "http://example.com/", "second", "spam") + >>> add("a", "http://example.com", "1", "a") + >>> add("Some Realm", "http://c.example.com:3128", "3", "c") + >>> add("Some Realm", "d.example.com", "4", "d") + >>> add("Some Realm", "e.example.com:3128", "5", "e") + + >>> mgr.find_user_password("Some Realm", "example.com") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/spam") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") + ('joe', 'password') + >>> mgr.find_user_password("c", "http://example.com/foo") + ('foo', 'ni') + >>> mgr.find_user_password("c", "http://example.com/bar") + ('bar', 'nini') + + Currently, we use the highest-level path where more than one match: + + >>> mgr.find_user_password("Some Realm", "http://example.com/ni") + ('joe', 'password') + + Use latest add_password() in case of conflict: + + >>> mgr.find_user_password("b", "http://example.com/") + ('second', 'spam') + + No special relationship between a.example.com and example.com: + + >>> mgr.find_user_password("a", "http://example.com/") + ('1', 'a') + >>> mgr.find_user_password("a", "http://a.example.com/") + (None, None) + + Ports: + + >>> mgr.find_user_password("Some Realm", "c.example.com") + (None, None) + >>> mgr.find_user_password("Some Realm", "c.example.com:3128") + ('3', 'c') + >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") + ('3', 'c') + >>> mgr.find_user_password("Some Realm", "d.example.com") + ('4', 'd') + >>> mgr.find_user_password("Some Realm", "e.example.com:3128") + ('5', 'e') + + """ + pass + + class MockOpener: addheaders = [] def open(self, req, data=None): @@ -89,6 +150,8 @@ return self.handle(self.meth_name, self.action, *args) class MockHandler: + # useful for testing handler machinery + # see add_ordered_mock_handlers() docstring handler_order = 500 def __init__(self, methods): self._define_methods(methods) @@ -161,6 +224,50 @@ opener.add_handler(h) return handlers +def build_test_opener(*handler_instances): + opener = OpenerDirector() + for h in handler_instances: + opener.add_handler(h) + return opener + +class MockHTTPHandler(urllib2.BaseHandler): + # useful for testing redirections and auth + # sends supplied headers and code as first response + # sends 200 OK as second response + def __init__(self, code, headers): + self.code = code + self.headers = headers + self.reset() + def reset(self): + self._count = 0 + self.requests = [] + def http_open(self, req): + import mimetools, httplib, copy + from StringIO import StringIO + self.requests.append(copy.deepcopy(req)) + if self._count == 0: + self._count = self._count + 1 + name = httplib.responses[self.code] + msg = mimetools.Message(StringIO(self.headers)) + return self.parent.error( + "http", req, MockFile(), self.code, name, msg) + else: + self.req = req + msg = mimetools.Message(StringIO("\r\n\r\n")) + return MockResponse(200, "OK", msg, "", req.get_full_url()) + +class MockPasswordManager: + def add_password(self, realm, uri, user, password): + self.realm = realm + self.url = uri + self.user = user + self.password = password + def find_user_password(self, realm, authuri): + self.target_realm = realm + self.target_url = authuri + return self.user, self.password + + class OpenerDirectorTests(unittest.TestCase): def test_handled(self): @@ -612,33 +719,18 @@ urllib2.HTTPRedirectHandler.max_redirections) def test_cookie_redirect(self): - class MockHTTPHandler(urllib2.HTTPHandler): - def __init__(self): self._count = 0 - def http_open(self, req): - import mimetools - from StringIO import StringIO - if self._count == 0: - self._count = self._count + 1 - msg = mimetools.Message( - StringIO("Location: http://www.cracker.com/\r\n\r\n")) - return self.parent.error( - "http", req, MockFile(), 302, "Found", msg) - else: - self.req = req - msg = mimetools.Message(StringIO("\r\n\r\n")) - return MockResponse(200, "OK", msg, "", req.get_full_url()) # cookies shouldn't leak into redirected requests from cookielib import CookieJar - from urllib2 import build_opener, HTTPHandler, HTTPError, \ - HTTPCookieProcessor from test.test_cookielib import interact_netscape cj = CookieJar() interact_netscape(cj, "http://www.example.com/", "spam=eggs") - hh = MockHTTPHandler() - cp = HTTPCookieProcessor(cj) - o = build_opener(hh, cp) + hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") + hdeh = urllib2.HTTPDefaultErrorHandler() + hrh = urllib2.HTTPRedirectHandler() + cp = urllib2.HTTPCookieProcessor(cj) + o = build_test_opener(hh, hdeh, hrh, cp) o.open("http://www.example.com/") self.assert_(not hh.req.has_header("Cookie")) @@ -659,7 +751,72 @@ self.assertEqual([(handlers[0], "http_open")], [tup[0:2] for tup in o.calls]) + def test_basic_auth(self): + opener = OpenerDirector() + password_manager = MockPasswordManager() + auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) + realm = "ACME Widget Store" + http_handler = MockHTTPHandler( + 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + self._test_basic_auth(opener, auth_handler, "Authorization", + realm, http_handler, password_manager, + "http://acme.example.com/protected", + "http://acme.example.com/protected", + ) + def test_proxy_basic_auth(self): + opener = OpenerDirector() + ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) + opener.add_handler(ph) + password_manager = MockPasswordManager() + auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) + realm = "ACME Networks" + http_handler = MockHTTPHandler( + 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + self._test_basic_auth(opener, auth_handler, "Proxy-authorization", + realm, http_handler, password_manager, + "http://acme.example.com:3128/protected", + "proxy.example.com:3128", + ) + + def _test_basic_auth(self, opener, auth_handler, auth_header, + realm, http_handler, password_manager, + request_url, protected_url): + import base64, httplib + user, password = "wile", "coyote" + opener.add_handler(auth_handler) + opener.add_handler(http_handler) + + # .add_password() fed through to password manager + auth_handler.add_password(realm, request_url, user, password) + self.assertEqual(realm, password_manager.realm) + self.assertEqual(request_url, password_manager.url) + self.assertEqual(user, password_manager.user) + self.assertEqual(password, password_manager.password) + + r = opener.open(request_url) + + # should have asked the password manager for the username/password + self.assertEqual(password_manager.target_realm, realm) + self.assertEqual(password_manager.target_url, protected_url) + + # expect one request without authorization, then one with + self.assertEqual(len(http_handler.requests), 2) + self.assertFalse(http_handler.requests[0].has_header(auth_header)) + userpass = '%s:%s' % (user, password) + auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() + self.assertEqual(http_handler.requests[1].get_header(auth_header), + auth_hdr_value) + + # if the password manager can't find a password, the handler won't + # handle the HTTP auth error + password_manager.user = password_manager.password = None + http_handler.reset() + r = opener.open(request_url) + self.assertEqual(len(http_handler.requests), 1) + self.assertFalse(http_handler.requests[0].has_header(auth_header)) + + class MiscTests(unittest.TestCase): def test_build_opener(self): @@ -830,20 +987,12 @@ cfh.setTimeout(1) handlers.append(cfh) -## # XXX try out some custom proxy objects too! -## def at_cnri(req): -## host = req.get_host() -## debug(host) -## if host[-18:] == '.cnri.reston.va.us': -## return True -## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us') -## ph = CustomProxyHandler(p) -## handlers.append(ph) - return handlers def test_main(verbose=None): + from test import test_urllib2 + test_support.run_doctest(test_urllib2, verbose) test_support.run_doctest(urllib2, verbose) tests = (TrivialTests, OpenerDirectorTests, Index: Lib/test/test_urllib2net.py =================================================================== --- Lib/test/test_urllib2net.py (revision 45407) +++ Lib/test/test_urllib2net.py (working copy) @@ -23,6 +23,42 @@ f = urllib2.urlopen("http://www.python.org/") x = f.read() + +class AuthTests(unittest.TestCase): + + def test_basic_auth(self): + import httplib + + test_url = "http://www.python.org/test/test_urllib2/basic_auth" + test_hostport = "www.python.org" + test_realm = 'Test Realm' + test_user = 'test.test_urllib2net' + test_password = 'blah' + + # failure + try: + urllib2.urlopen(test_url) + except urllib2.HTTPError, exc: + self.assertEqual(exc.code, 401) + else: + self.fail("urlopen() should have failed with 401") + + # success + auth_handler = urllib2.HTTPBasicAuthHandler() + auth_handler.add_password(test_realm, test_hostport, + test_user, test_password) + opener = urllib2.build_opener(auth_handler) + f = opener.open('http://localhost/') + response = urllib2.urlopen("http://www.python.org/") + + # The 'userinfo' URL component is deprecated by RFC 3986 for security + # reasons, let's not implement it! (it's already implemented for proxy + # specification strings (that is, URLs or authorities specifying a + # proxy), so we must keep that) + self.assertRaises(httplib.InvalidURL, + urllib2.urlopen, "http://evil:thing@example.com") + + class urlopenNetworkTests(unittest.TestCase): """Tests urllib2.urlopen using the network. @@ -86,7 +122,8 @@ def test_main(): test_support.requires("network") - test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests) + test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests, + AuthTests) if __name__ == "__main__": test_main()