''' uriparse A uri parsing library that strives to be STD66 (aka RFC3986) compliant. http://gbiv.com/protocols/uri/rfc/ has a brief history of URI standards, which helps to clarify why it took so long to get to where this module Features: * Extensible URI-handling framework that includes default URI parsers for many common URI schemes * convenience methods for splitting up/rejoining uris * convenience methods for splitting up/rejoining authority strings, also known as netloc strings * urljoin, which produces an absolute uri given a base and a relative path to apply Comments: * The code looks simple, and you may wonder at the lack of handling %-encoding, but STD66 section 2.4 says that %-encodings can't be delimiters, so it's okay to be naive. TODO: * Test against all examples in the RFC ''' def urisplit(uri): ''' Basic URI Parser according to STD66 aka RFC3986 >>> urisplit("scheme://authority/path?query#fragment") ('scheme', 'authority', 'path', 'query', 'fragment') ''' import re # regex straight from STD 66 section B regex = '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?' p = re.match(regex, uri).groups() scheme, authority, path, query, fragment = p[1], p[3], p[4], p[6], p[8] if not path: path = None return (scheme, authority, path, query, fragment) def uriunsplit((scheme, authority, path, query, fragment)): ''' Reverse of urisplit() >>> uriunsplit(('scheme','authority','path','query','fragment')) "scheme://authority/path?query#fragment" ''' result = '' if scheme: result += scheme + ':' if authority: result += '//' + authority if path: result += path if query: result += '?' + query if fragment: result += '#' + fragment return result def split_authority(authority): ''' Basic authority parser that splits authority into component parts >>> split_authority("user:password@host:port") ('user', 'password', 'host', 'port') ''' if '@' in authority: userinfo, hostport = authority.split('@', 1) else: userinfo, hostport = None, authority if userinfo and ':' in userinfo: user, passwd = userinfo.split(':', 1) else: user, passwd = userinfo, None if hostport and ':' in hostport: host, port = hostport.split(':', 1) else: host, port = hostport, None if not host: host = None return (user, passwd, host, port) def join_authority((user, passwd, host, port)): ''' Reverse of split_authority() >>>join_authority(('user', 'password', 'host', 'port')) "user:password@host:port" ''' result = '' if user: result += user if passwd: result += ':' + passwd result += '@' result += host if port: result += ':' + port return result class UrlParser: ''' Basic URL parsing class. In this code URI generally refers to generic URIs and URL refers to to URIs that match scheme://user:password@host:port/path?query#fragment ''' # user, password, host, port, path, query, fragment _defaults = (None, None, None, None, None, None, None) def __init__(self, defaults=None): if defaults: self._defaults = defaults dlist = [ d for d in self._defaults ] for d in range(len(self._defaults)): if dlist[d]: dlist[d] = str(dlist[d]) self._defaults = dlist def parse(self, urlinfo): scheme, authority, path, query, frag = urisplit(urlinfo) user, passwd, host, port = split_authority(authority) duser, dpasswd, dhost, dport, dpath, dquery, dfrag = self._defaults if user is None: user = duser if passwd is None: passwd = dpasswd if host is None: host = dhost if port is None: port = dport if path is None: path = dpath if query is None: query = dquery if frag is None: frag = self._defaults[6] return (user, passwd, host, port, path, query, frag) def unparse(self, pieces): authority = unparse_authority(pieces[:4]) return unparse_uri(('', authority, pieces[4], pieces[5], pieces[6])) class HttpUrlParser(UrlParser): _defaults=(None, None, None, 80, '/', None, None) class HttpsUrlParser(HttpUrlParser): _defaults=(None, None, None, 443, '/', None, None) class ShttpUrlParser(HttpUrlParser): _defaults=(None, None, None, 443, '/', None, None) class ImapUrlParser(UrlParser): _defaults=(None, None, 'localhost', 143, '/', None, None) class ImapsUrlParser(UrlParser): _defaults=(None, None, 'localhost', 993, '/', None, None) class FtpUrlParser(UrlParser): _defaults=('anonymous', 'anonymous', None, 21, '/', None, None) class TftpUrlParser(UrlParser): _defaults=(None, None, None, 69, '/', None, None) class FileUrlParser(UrlParser): _defaults=(None, None, None, None, '/', None, None) class TelnetUrlParser(UrlParser): _defaults=(None, None, None, 23, '/', None, None) class MailtoUriParser(UrlParser): # user, host, query, fragment _defaults = (None, None, None, None) def parse(self, urlinfo): scheme, authority, path, query, frag = urisplit(urlinfo) user, host = path.split('@', 1) return (user, host, query, frag) def unparse(self, pieces): path = pieces[0] + '@' + pieces[1] return unparse_uri(('', None, path, pieces[2], pieces[3])) DefaultSchemes = { 'http': HttpUrlParser, 'https': HttpsUrlParser, 'imap': ImapUrlParser, 'imaps': ImapsUrlParser, 'ftp': FtpUrlParser, 'tftp': TftpUrlParser, 'file': FileUrlParser, 'telnet': TelnetUrlParser, 'mailto': MailtoUriParser, } class URIParser(object): def __init__(self, schemes=DefaultSchemes, extra={}): self._parsers = {} self._parsers.update(schemes) self._parsers.update(extra) ''' parse the url. allow for defaults of: scheme, user, password, host, port, path ''' def parse(self, url, defaults=None): return self.parserFor(url)(defaults).parse(url) def unparse(self, pieces, defaults=None): return self.parserFor(url)(defaults).unparse(pieces) ''' these work on any URI ''' def schemeOf(self, url): return url.split(':')[0] def infoOf(self, url): return url.split(':')[1] def parserFor(self, url): return self._parsers[self.schemeOf(url)] def _dirname(p): q = p while q and not q.endswith('/'): q = q[:-1] return q def _pathjoin(a,b): if not a or not b: return a or b elif not b.startswith('/'): return a+'/'+b else: return b ''' path-join paths, replace anything else ''' def urljoin(base, url): import posixpath as ppath bscheme, bauthority, bpath, bquery, bfragment = urisplit(base) uscheme, uauthority, upath, uquery, ufragment = urisplit(url) if uscheme: if bscheme != uscheme: return url bscheme = uscheme if uauthority: bauthority, bpath, bquery, bfragment = \ uauthority, upath, uquery, ufragment elif upath: bpath = ppath.normpath(_pathjoin(_dirname(bpath), upath)) bquery, bfragment = uquery, ufragment elif uquery: bquery, bfragment = uquery, ufragment elif ufragment: bfragment = ufragment return uriunsplit((bscheme, bauthority, bpath, bquery, bfragment)) ___doc___ = ''' Usage: # the verbose way try: p = URIParser(extra={'custom': CustomSchemeHandler}) defaults = () if p.schemeOf(url) == 'http': defaults = ('user', 'password', 'host', 'port', 'path') pieces = p.parse(url, defaults) except UnknownSchemeError: print 'unknown scheme' # quick-n-dirty try: pieces = URIParser({'custom':CustomSchemeHandler}).parse(url, ('user','pass','host','port','path')) except UnknownSchemeError: print 'unknown scheme' ''' def _test(): import sys parsetests = { # Simple tests 'http://user:pass@host:port/path?query=result#fragment': ('user', 'pass', 'host', 'port', '/path', 'query=result', 'fragment'), 'http://user@host:port/path?query=result#fragment': ('user', None,'host','port', '/path', 'query=result', 'fragment'), 'http://host:port/path?query=result#fragment': (None, None,'host','port', '/path', 'query=result', 'fragment'), 'http://host/path?query=result#fragment': (None, None, 'host', '80', '/path', 'query=result', 'fragment'), 'http://host/path?query=result': (None, None, 'host', '80', '/path','query=result',None), 'http://host/path#fragment': (None, None, 'host', '80', '/path', None, 'fragment'), 'http://host/path': (None, None, 'host', '80', '/path', None, None), 'http://host': (None, None, 'host', '80', '/', None, None), 'http:///path': (None, None, None, '80', '/path', None, None), # torture tests 'http://user:pass@host:port/path?que:ry/res@ult#fr@g:me/n?t': ('user', 'pass', 'host', 'port', '/path', 'que:ry/res@ult', 'fr@g:me/n?t'), 'http://user:pass@host:port/path#fr@g:me/n?t': ('user', 'pass', 'host', 'port', '/path', None, 'fr@g:me/n?t'), 'http://user:pass@host:port?que:ry/res@ult#fr@g:me/n?t': ('user', 'pass', 'host', 'port', '/', 'que:ry/res@ult', 'fr@g:me/n?t'), 'http://user:pass@host:port#fr@g:me/n?t': ('user', 'pass', 'host', 'port', '/', None, 'fr@g:me/n?t'), } failures = 0 for url in parsetests: print ("url: %s : " % url), result = URIParser().parse(url) if result == parsetests[url]: print "passed" else: print "Failed." print " got: %s" % repr(result) print " expected: %s" % repr(parsetests[url]) failures += 1 base = "http://a/b/c/d;p?q" jointests = { # Normal Examples from STD 66 Section 5.4.1 "g:h" : "g:h", "g" : "http://a/b/c/g", "./g" : "http://a/b/c/g", "g/" : "http://a/b/c/g/", "/g" : "http://a/g", "//g" : "http://g", "?y" : "http://a/b/c/d;p?y", "g?y" : "http://a/b/c/g?y", "#s" : "http://a/b/c/d;p?q#s", "g#s" : "http://a/b/c/g#s", "g?y#s" : "http://a/b/c/g?y#s", ";x" : "http://a/b/c/;x", "g;x" : "http://a/b/c/g;x", "g;x?y#s" : "http://a/b/c/g;x?y#s", "" : "http://a/b/c/d;p?q", "." : "http://a/b/c/", "./" : "http://a/b/c/", ".." : "http://a/b/", "../" : "http://a/b/", "../g" : "http://a/b/g", "../.." : "http://a/", "../../" : "http://a/", "../../g" : "http://a/g", # Abnormal Examples from STD 66 Section 5.4.2 "../../../g" : "http://a/g", "../../../../g" : "http://a/g", "/./g" : "http://a/g", "/../g" : "http://a/g", "g." : "http://a/b/c/g.", ".g" : "http://a/b/c/.g", "g.." : "http://a/b/c/g..", "..g" : "http://a/b/c/..g", "./../g" : "http://a/b/g", "./g/." : "http://a/b/c/g/", "g/./h" : "http://a/b/c/g/h", "g/../h" : "http://a/b/c/h", "g;x=1/./y" : "http://a/b/c/g;x=1/y", "g;x=1/../y" : "http://a/b/c/y", "g?y/./x" : "http://a/b/c/g?y/./x", "g?y/../x" : "http://a/b/c/g?y/../x", "g#s/./x" : "http://a/b/c/g#s/./x", "g#s/../x" : "http://a/b/c/g#s/../x", "http:g" : "http://a/b/c/g" } for relref in jointests: result = urljoin(base, relref) print ("%s + %s = %s : " % (repr(base), repr(relref), repr(result))), if result == jointests[relref]: print "passed" elif result + '/' == jointests[relref]: # unclear whether this is the same or not # fixable by fixing the use of posixpath.normpath above print "passed" else: print "Failed.\n expected: %s " % repr(jointests[relref]) failures += 1 print ("%d Tests finished." % (len(parsetests)+len(jointests))), print "%d failures." % failures sys.exit(failures) if __name__ == '__main__': _test()