Index: Lib/urllib/parse.py =================================================================== --- Lib/urllib/parse.py (revision 85055) +++ Lib/urllib/parse.py (working copy) @@ -54,11 +54,18 @@ 'nntp', 'wais', 'https', 'shttp', 'snews', 'file', 'prospero', ''] +_groups = (uses_relative, uses_netloc, non_hierarchical, + uses_params, uses_query, uses_fragment) +for _group in _groups: + for _scheme in _group[:]: + _group.append(_scheme.encode('ascii')) + # Characters valid in scheme names scheme_chars = ('abcdefghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' '0123456789' '+-.') +scheme_charsb = scheme_chars.encode('ascii') MAX_CACHE_SIZE = 20 _parse_cache = {} @@ -73,44 +80,57 @@ """Shared methods for the parsed result objects.""" @property - def username(self): + def _userinfo(self): netloc = self.netloc - if "@" in netloc: - userinfo = netloc.rsplit("@", 1)[0] - if ":" in userinfo: - userinfo = userinfo.split(":", 1)[0] - return userinfo - return None + at_symbol, colon = '@:' if isinstance(netloc, str) else (b'@', b':') + userinfo, have_info, hostinfo = netloc.rpartition(at_symbol) + if have_info: + username, have_password, password = userinfo.partition(colon) + if not have_password: + password = None + else: + username = password = None + return username, password @property - def password(self): + def _hostinfo(self): netloc = self.netloc - if "@" in netloc: - userinfo = netloc.rsplit("@", 1)[0] - if ":" in userinfo: - return userinfo.split(":", 1)[1] - return None + at_symbol, colon, open_br, close_br = ( + '@:[]' if isinstance(netloc, str) else (b'@', b':',b'[', b']')) + _, _, hostinfo = netloc.rpartition(at_symbol) + _, have_open_br, bracketed = hostinfo.partition(open_br) + if have_open_br: + hostname, _, port = bracketed.partition(close_br) + _, have_port, port = port.partition(colon) + else: + hostname, have_port, port = hostinfo.partition(colon) + if not have_port: + port = None + return hostname, port @property + def username(self): + return self._userinfo[0] + + @property + def password(self): + return self._userinfo[1] + + @property def hostname(self): - netloc = self.netloc.split('@')[-1] - if '[' in netloc and ']' in netloc: - return netloc.split(']')[0][1:].lower() - elif ':' in netloc: - return netloc.split(':')[0].lower() - elif netloc == '': - return None - else: - return netloc.lower() + hostname = self._hostinfo[0] + if not hostname: + hostname = None + elif hostname is not None: + hostname = hostname.lower() + return hostname @property def port(self): - netloc = self.netloc.split('@')[-1].split(']')[-1] - if ':' in netloc: - port = netloc.split(':')[1] - return int(port, 10) - else: - return None + port = self._hostinfo[1] + if port is not None: + port = int(port, 10) + return port from collections import namedtuple @@ -138,24 +158,27 @@ (e.g. netloc is a single string) and we don't expand % escapes.""" tuple = urlsplit(url, scheme, allow_fragments) scheme, netloc, url, query, fragment = tuple - if scheme in uses_params and ';' in url: + semicolon = ';' if isinstance(url, str) else b';' + if scheme in uses_params and semicolon in url: url, params = _splitparams(url) else: params = '' return ParseResult(scheme, netloc, url, params, query, fragment) def _splitparams(url): - if '/' in url: - i = url.find(';', url.rfind('/')) + slash, semicolon = '/;' if isinstance(url, str) else (b'/',b';') + if slash in url: + i = url.find(semicolon, url.rfind(slash)) if i < 0: - return url, '' + return url, type(url)() else: - i = url.find(';') + i = url.find(semicolon) return url[:i], url[i+1:] def _splitnetloc(url, start=0): delim = len(url) # position of end of domain part of url, default is end - for c in '/?#': # look for delimiters; the order is NOT important + delims = '/?#' if isinstance(url, str) else (b'/',b'?',b'#') + for c in delims: # look for delimiters; the order is NOT important wdelim = url.find(c, start) # find first of this delim if wdelim >= 0: # if found delim = min(delim, wdelim) # use earliest delim position @@ -174,39 +197,50 @@ return cached if len(_parse_cache) >= MAX_CACHE_SIZE: # avoid runaway growth clear_cache() - netloc = query = fragment = '' - i = url.find(':') + # is called pound rather than hash to avoid builtin name clash + if isinstance(url, str): + netloc = query = fragment = '' + colon, open_br, close_br, slash, pound, question_mark = ':[]/#?' + http_scheme = 'http' + _scheme_chars = scheme_chars + else: + netloc = query = fragment = b'' + colon, open_br, close_br, slash, pound, question_mark = (b':',b'[',b']',b'/',b'#',b'?') + http_scheme = b'http' + _scheme_chars = scheme_charsb + double_slash = slash*2 + i = url.find(colon) if i > 0: - if url[:i] == 'http': # optimize the common case + if url[:i] == http_scheme: # optimize the common case scheme = url[:i].lower() url = url[i+1:] - if url[:2] == '//': + if url[:2] == double_slash: netloc, url = _splitnetloc(url, 2) - if (('[' in netloc and ']' not in netloc) or - (']' in netloc and '[' not in netloc)): + if ((open_br in netloc and close_br not in netloc) or + (close_br in netloc and open_br not in netloc)): raise ValueError("Invalid IPv6 URL") - if allow_fragments and '#' in url: - url, fragment = url.split('#', 1) - if '?' in url: - url, query = url.split('?', 1) + if allow_fragments and pound in url: + url, fragment = url.split(pound, 1) + if question_mark in url: + url, query = url.split(question_mark, 1) v = SplitResult(scheme, netloc, url, query, fragment) _parse_cache[key] = v return v - if url.endswith(':') or not url[i+1].isdigit(): + if url.endswith(colon) or not url[i+1:i+2].isdigit(): for c in url[:i]: - if c not in scheme_chars: + if c not in _scheme_chars: break else: scheme, url = url[:i].lower(), url[i+1:] - if url[:2] == '//': + if url[:2] == double_slash: netloc, url = _splitnetloc(url, 2) - if (('[' in netloc and ']' not in netloc) or - (']' in netloc and '[' not in netloc)): + if ((open_br in netloc and close_br not in netloc) or + (close_br in netloc and open_br not in netloc)): raise ValueError("Invalid IPv6 URL") - if allow_fragments and scheme in uses_fragment and '#' in url: - url, fragment = url.split('#', 1) - if scheme in uses_query and '?' in url: - url, query = url.split('?', 1) + if allow_fragments and scheme in uses_fragment and pound in url: + url, fragment = url.split(pound, 1) + if scheme in uses_query and question_mark in url: + url, query = url.split(question_mark, 1) v = SplitResult(scheme, netloc, url, query, fragment) _parse_cache[key] = v return v @@ -218,7 +252,8 @@ (the draft states that these are equivalent).""" scheme, netloc, url, params, query, fragment = components if params: - url = "%s;%s" % (url, params) + semicolon = ';' if isinstance(url, str) else b';' + url += semicolon + params return urlunsplit((scheme, netloc, url, query, fragment)) def urlunsplit(components): @@ -228,15 +263,19 @@ was parsed originally had unnecessary delimiters (for example, a ? with an empty query; the RFC states that these are equivalent).""" scheme, netloc, url, query, fragment = components - if netloc or (scheme and scheme in uses_netloc and url[:2] != '//'): - if url and url[:1] != '/': url = '/' + url - url = '//' + (netloc or '') + url + # is called pound rather than hash to avoid builtin name clash + colon, slash, pound, question_mark = ( + ':/#?' if isinstance(url, str) else (b':',b'/',b'#',b'?')) + double_slash = slash*2 + if netloc or (scheme and scheme in uses_netloc and url[:2] != double_slash): + if url and url[:1] != slash: url = slash + url + url = double_slash + (netloc or type(url)()) + url if scheme: - url = scheme + ':' + url + url = scheme + colon + url if query: - url = url + '?' + query + url = url + question_mark + query if fragment: - url = url + '#' + fragment + url = url + pound + fragment return url def urljoin(base, url, allow_fragments=True): @@ -257,7 +296,9 @@ return urlunparse((scheme, netloc, path, params, query, fragment)) netloc = bnetloc - if path[:1] == '/': + slash, period = '/.' if isinstance(url, str) else (b'/',b'.') + double_period = period*2 + if path[:1] == slash: return urlunparse((scheme, netloc, path, params, query, fragment)) if not path: @@ -272,28 +313,29 @@ query = bquery return urlunparse((scheme, netloc, path, params, query, fragment)) - segments = bpath.split('/')[:-1] + path.split('/') + segments = bpath.split(slash)[:-1] + path.split(slash) # XXX The stuff below is bogus in various ways... - if segments[-1] == '.': - segments[-1] = '' - while '.' in segments: - segments.remove('.') + empty = type(url)() + if segments[-1] == period: + segments[-1] = empty + while period in segments: + segments.remove(period) while 1: i = 1 n = len(segments) - 1 while i < n: - if (segments[i] == '..' - and segments[i-1] not in ('', '..')): + if (segments[i] == double_period + and segments[i-1] not in (empty, double_period)): del segments[i-1:i+1] break i = i+1 else: break - if segments == ['', '..']: - segments[-1] = '' - elif len(segments) >= 2 and segments[-1] == '..': - segments[-2:] = [''] - return urlunparse((scheme, netloc, '/'.join(segments), + if segments == [empty, double_period]: + segments[-1] = empty + elif len(segments) >= 2 and segments[-1] == double_period: + segments[-2:] = [empty] + return urlunparse((scheme, netloc, slash.join(segments), params, query, fragment)) def urldefrag(url): @@ -303,12 +345,14 @@ the URL contained no fragments, the second element is the empty string. """ - if '#' in url: + pound = '#' if isinstance(url, str) else b'#' + empty = type(url)() + if pound in url: s, n, p, a, q, frag = urlparse(url) - defrag = urlunparse((s, n, p, a, q, '')) + defrag = urlunparse((s, n, p, a, q, empty)) return defrag, frag else: - return url, '' + return url, empty def unquote_to_bytes(string): """unquote_to_bytes('abc%20def') -> b'abc def'.""" @@ -420,23 +464,35 @@ Returns a list, as G-d intended. """ - pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] + # Different unquote implementation for different input types for now + # This means this function currently only works correctly for utf-8 + # encoded bytes, so something needs to be done to fix that + if isinstance(qs, str): + empty = '' + semicolon, ampersand, equals, plus, space = ';&=+ ' + _unquote = unquote + else: + empty = b'' + semicolon, ampersand, equals, plus, space = ( + b';',b'&',b'=',b'+',b' ') + _unquote = unquote_to_bytes + pairs = [s2 for s1 in qs.split(ampersand) for s2 in s1.split(semicolon)] r = [] for name_value in pairs: if not name_value and not strict_parsing: continue - nv = name_value.split('=', 1) + nv = name_value.split(equals, 1) if len(nv) != 2: if strict_parsing: raise ValueError("bad query field: %r" % (name_value,)) # Handle case of a control-name with no equal sign if keep_blank_values: - nv.append('') + nv.append(empty) else: continue if len(nv[1]) or keep_blank_values: - name = unquote(nv[0].replace('+', ' ')) - value = unquote(nv[1].replace('+', ' ')) + name = _unquote(nv[0].replace(plus, space)) + value = _unquote(nv[1].replace(plus, space)) r.append((name, value)) return r Index: Lib/test/test_urlparse.py =================================================================== --- Lib/test/test_urlparse.py (revision 85055) +++ Lib/test/test_urlparse.py (working copy) @@ -24,6 +24,17 @@ ("&a=b", [('a', 'b')]), ("a=a+b&b=b+c", [('a', 'a b'), ('b', 'b c')]), ("a=1&a=2", [('a', '1'), ('a', '2')]), + (b"", []), + (b"&", []), + (b"&&", []), + (b"=", [(b'', b'')]), + (b"=a", [(b'', b'a')]), + (b"a", [(b'a', b'')]), + (b"a=", [(b'a', b'')]), + (b"a=", [(b'a', b'')]), + (b"&a=b", [(b'a', b'b')]), + (b"a=a+b&b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), + (b"a=1&a=2", [(b'a', b'1'), (b'a', b'2')]), ] class UrlParseTestCase(unittest.TestCase): @@ -110,7 +121,7 @@ ('git+ssh', 'git@github.com','/user/project.git', '','',''), ('git+ssh', 'git@github.com','/user/project.git', - '', '')) + '', '')), ] for url, parsed, split in testcases: self.checkRoundtrips(url, parsed, split) @@ -144,11 +155,16 @@ self.checkRoundtrips(url, parsed, split) def checkJoin(self, base, relurl, expected): - self.assertEqual(urllib.parse.urljoin(base, relurl), expected, - (base, relurl, expected)) + str_components = (base, relurl, expected) + self.assertEqual(urllib.parse.urljoin(base, relurl), expected) + bytes_components = baseb, relurlb, expectedb = [ + x.encode('ascii') for x in str_components] + self.assertEqual(urllib.parse.urljoin(baseb, relurlb), expectedb) def test_unparse_parse(self): - for u in ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',]: + str_cases = ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',] + bytes_cases = [x.encode('ascii') for x in str_cases] + for u in str_cases+bytes_cases: self.assertEqual(urllib.parse.urlunsplit(urllib.parse.urlsplit(u)), u) self.assertEqual(urllib.parse.urlunparse(urllib.parse.urlparse(u)), u) @@ -328,7 +344,7 @@ self.checkJoin(SIMPLE_BASE, 'http:g?y/./x','http://a/b/c/g?y/./x') def test_RFC2732(self): - for url, hostname, port in [ + str_cases = [ ('http://Test.python.org:5432/foo/', 'test.python.org', 5432), ('http://12.34.56.78:5432/foo/', '12.34.56.78', 5432), ('http://[::1]:5432/foo/', '::1', 5432), @@ -349,20 +365,26 @@ ('http://[::12.34.56.78]/foo/', '::12.34.56.78', None), ('http://[::ffff:12.34.56.78]/foo/', '::ffff:12.34.56.78', None), - ]: + ] + def _encode(t): + return t[0].encode('ascii'), t[1].encode('ascii'), t[2] + bytes_cases = [_encode(x) for x in str_cases] + for url, hostname, port in str_cases + bytes_cases: urlparsed = urllib.parse.urlparse(url) self.assertEqual((urlparsed.hostname, urlparsed.port) , (hostname, port)) - for invalid_url in [ + str_cases = [ 'http://::12.34.56.78]/', 'http://[::1/foo/', 'ftp://[::1/foo/bad]/bad', 'http://[::1/foo/bad]/bad', - 'http://[::ffff:12.34.56.78']: + 'http://[::ffff:12.34.56.78'] + bytes_cases = [x.encode('ascii') for x in str_cases] + for invalid_url in str_cases + bytes_cases: self.assertRaises(ValueError, urllib.parse.urlparse, invalid_url) def test_urldefrag(self): - for url, defrag, frag in [ + str_cases = [ ('http://python.org#frag', 'http://python.org', 'frag'), ('http://python.org', 'http://python.org', ''), ('http://python.org/#frag', 'http://python.org/', 'frag'), @@ -373,7 +395,11 @@ ('http://python.org/p?q', 'http://python.org/p?q', ''), (RFC1808_BASE, 'http://a/b/c/d;p?q', 'f'), (RFC2396_BASE, 'http://a/b/c/d;p?q', ''), - ]: + ] + def _encode(t): + return type(t)(x.encode('ascii') for x in t) + bytes_cases = [_encode(x) for x in str_cases] + for url, defrag, frag in str_cases + bytes_cases: self.assertEqual(urllib.parse.urldefrag(url), (defrag, frag)) def test_urlsplit_attributes(self):