Index: Doc/library/urllib.request.rst =================================================================== --- Doc/library/urllib.request.rst (revision 67921) +++ Doc/library/urllib.request.rst (working copy) @@ -52,7 +52,28 @@ as a dict parameter to urlopen can be availed by the use of `ProxyHandler` objects. +.. function:: urlopen_text(url[, data][, timeout][, fallback][, force_charset]) + Open the URL *url* as :func:`urlopen`, emulating text IO. + + The optional *fallback* parameter specifies a fallback charset to use when + reading from the returned file-like object. If *force_charset* is :bool:`True`, + the fallback charset will be used regardless of the response's declared + charset. + + This function returns a file-like object almost identical to :func:`urlopen`, + but emulating text IO instead of bytes IO. This file-like object has three + additional attributes: + + * :attr:`fallback` --- The *fallback* charset that was passed to :func:`urlopen_text` + + * :attr:`charset` --- The charset used to encode IO from the returned file-like + object. It is equal to :attr:`fallback` if no encoding was detected in the response + or if :attr:`force_charset` is :bool:`True`. Otherwise, it is the charset detected. + + * :attr:`force_charset` --- The *force_charset* charset that was passed to + :func:`urlopen_text` + .. function:: install_opener(opener) Install an :class:`OpenerDirector` instance as the default global opener. Index: Lib/urllib/request.py =================================================================== --- Lib/urllib/request.py (revision 67921) +++ Lib/urllib/request.py (working copy) @@ -121,6 +121,48 @@ _opener = build_opener() return _opener.open(url, data, timeout) +_GDT = socket._GLOBAL_DEFAULT_TIMEOUT +def urlopen_text(url, data=None, timeout=_GDT, fallback='utf-8', force=False): + """ Opens an URL and allows reading text (instead of bytes) from it + + This helper pretends that the file-like object returned by urlopen works + in text mode, allowing one to read strings from it. Encoding is either + response.headers.get_charset() or a supplied fallback charset, which + defaults to UTF-8. If 'force' is True, 'fallback' overrides + response.headers.get_charset(). + """ + response = urlopen(url, data, timeout) + _readline = response.readline + _readlines = response.readlines + _read = response.read + header = response.headers + response.fallback = fallback + response.force_charset = force + response.charset = fallback + if not force: + resp_charset = header.get_charsets()[0] or header.get_charset() + if not resp_charset: + print("Using default charset '%s'" % response.charset) + else: + response.charset = resp_charset + + def readline(limit = -1): + content = _readline(limit=limit) + return str(content, encoding=response.charset) + response.readline = readline + + def readlines(hint = None): + content = _readlines(hint=hint) + return [str(line, encoding=response.charset) for line in content] + response.readlines = readlines + + def read(n = -1): + content = _read(n=n) + return str(content, encoding=response.charset) + response.read = read + + return response + def install_opener(opener): global _opener _opener = opener Index: Lib/test/test_urllib2_localnet.py =================================================================== --- Lib/test/test_urllib2_localnet.py (revision 67921) +++ Lib/test/test_urllib2_localnet.py (working copy) @@ -1,4 +1,4 @@ -#!/usr/bin/env python + #!/usr/bin/env python import email import threading @@ -310,7 +310,10 @@ self.send_response(response_code) for (header, value) in headers: - self.send_header(header, value % self.port) + if '%' in value: + self.send_header(header, value % self.port) + else: + self.send_header(header, value) if body: self.send_header("Content-type", "text/plain") self.end_headers() @@ -451,9 +454,190 @@ urllib.request.urlopen, "http://sadflkjsasf.i.nvali.d/") +class TestUrlopen_Text(unittest.TestCase): + """Tests urllib2.urlopen using the network. + + These tests are not exhaustive. Assuming that testing using files does a + good job overall of some of the basic interface features. There are no + tests exercising the optional 'data' and 'proxies' arguments. No tests + for transparent redirection have been written. + """ + + def setUp(self): + self.server = None + + def tearDown(self): + if self.server is not None: + self.server.stop() + + def urlopen(self, url, data=None, fallback='utf-8', force=False): + urlopen_text = urllib.request.urlopen_text + f = urlopen_text(url, data, fallback=fallback, force=force) + result = f.read() + f.close() + return result, f.charset + + def start_server(self, responses=None): + if responses is None: + responses = [(200, [], b"we don't care")] + handler = GetRequestHandler(responses) + + self.server = LoopbackHttpServerThread(handler) + self.server.start() + self.server.ready.wait() + port = self.server.port + handler.port = port + return handler + + def test_redirection(self): + expected_response = b"We got here..." + responses = [ + (302, [("Location", "http://localhost:%s/somewhere_else")], ""), + (200, [], expected_response) + ] + + handler = self.start_server(responses) + response = self.urlopen("http://localhost:%s/" % handler.port) + data, encoding = response + expected_response = str(expected_response, encoding) + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/", "/somewhere_else"]) + + def test_404(self): + expected_response = b"Bad bad bad..." + handler = self.start_server([(404, [], expected_response)]) + + try: + self.urlopen("http://localhost:%s/weeble" % handler.port) + except urllib.error.URLError as f: + data = f.read() + f.close() + else: + self.fail("404 should raise URLError") + + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/weeble"]) + + def test_200(self): + expected_response = b"pycon 2008..." + handler = self.start_server([(200, [], expected_response)]) + response = self.urlopen("http://localhost:%s/bizarre" % handler.port) + data, encoding = response + expected_response = str(expected_response, encoding) + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/bizarre"]) + + def test_200_with_good_encoding(self): + encodings = {"utf8" : b"\xc3\xa1\xc3\xa4\xc3\xb1\xc2\xa3\xc3\xa7", + "ascii": b"pycon 2008...", + } + cont_type = "text/plain;charset=%s" + for encoding, expected_response in encodings.items(): + header = [('Content-Type', cont_type % encoding)] + handler = self.start_server([(200, header, expected_response)]) + response = self.urlopen("http://localhost:%s/enc" % handler.port) + data, encoding = response + expected_response = str(expected_response, encoding) + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/enc"]) + self.server.stop() + + def test_200_with_bad_encoding(self): + encodings = {"utf8" : b'\xff\xfe\xe3\x00\xe7\x00\xa3\x00', + "ascii": b"\xc3\xa1\xc3\xa4\xc3\xb1\xc2\xa3\xc3\xa7", + } + cont_type = "text/plain;charset=%s" + for encoding, expected_response in encodings.items(): + header = [('Content-Type', cont_type % encoding)] + handler = self.start_server([(200, header, expected_response)]) + self.assertRaises(UnicodeDecodeError, + self.urlopen, + "http://localhost:%s/enc" % handler.port) + self.server.stop() + + def test_200_overriding_bad_encoding(self): + encodings = { + ("utf8", "utf16") : b'\xff\xfe\xe3\x00\xe7\x00\xa3\x00', + ("ascii", "utf8"): b"\xc3\xa1\xc3\xa4\xc3\xb1\xc2\xa3\xc3\xa7", + } + cont_type = "text/plain;charset=%s" + for encoding, expected_response in encodings.items(): + header = [('Content-Type', cont_type % encoding[0])] + handler = self.start_server([(200, header, expected_response)]) + url = "http://localhost:%s/enc" % handler.port + response = self.urlopen(url, fallback=encoding[1], force=True) + data, encoding = response + expected_response = str(expected_response, encoding) + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/enc"]) + self.server.stop() + + def test_200_with_parameters(self): + expected_response = b"pycon 2008..." + handler = self.start_server([(200, [], expected_response)]) + response = self.urlopen("http://localhost:%s/bizarre" % handler.port, + b"get=with_feeling") + data, encoding = response + expected_response = str(expected_response, encoding) + self.assertEquals(data, expected_response) + self.assertEquals(handler.requests, ["/bizarre", b"get=with_feeling"]) + + def test_sending_headers(self): + handler = self.start_server() + req = urllib.request.Request("http://localhost:%s/" % handler.port, + headers={"Range": "bytes=20-39"}) + urllib.request.urlopen(req) + self.assertEqual(handler.headers_received["Range"], "bytes=20-39") + + def test_basic(self): + handler = self.start_server() + open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) + for attr in ("read", "close", "info", "geturl"): + self.assert_(hasattr(open_url, attr), "object returned from " + "urlopen lacks the %s attribute" % attr) + try: + self.assert_(open_url.read(), "calling 'read' failed") + finally: + open_url.close() + + def test_info(self): + handler = self.start_server() + try: + open_url = urllib.request.urlopen( + "http://localhost:%s" % handler.port) + info_obj = open_url.info() + self.assert_(isinstance(info_obj, email.message.Message), + "object returned by 'info' is not an instance of " + "email.message.Message") + self.assertEqual(info_obj.get_content_subtype(), "plain") + finally: + self.server.stop() + + def test_geturl(self): + # Make sure same URL as opened is returned by geturl. + handler = self.start_server() + open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) + url = open_url.geturl() + self.assertEqual(url, "http://localhost:%s" % handler.port) + + def test_bad_address(self): + # Make sure proper exception is raised when connecting to a bogus + # address. + self.assertRaises(IOError, + # SF patch 809915: In Sep 2003, VeriSign started + # highjacking invalid .com and .net addresses to + # boost traffic to their own site. This test + # started failing then. One hopes the .invalid + # domain will be spared to serve its defined + # purpose. + urllib.request.urlopen, + "http://sadflkjsasf.i.nvali.d/") + + def test_main(): support.run_unittest(ProxyAuthTests) support.run_unittest(TestUrlopen) + support.run_unittest(TestUrlopen_Text) if __name__ == "__main__": test_main() Index: Lib/test/test_urllibnet.py =================================================================== --- Lib/test/test_urllibnet.py (revision 67921) +++ Lib/test/test_urllibnet.py (working copy) @@ -141,6 +141,159 @@ urllib.request.urlopen, "http://sadflkjsasf.i.nvali.d/") +class urlopen_textNetworkTests(unittest.TestCase): + """Tests urllib.reqest.urlopen_text (mostly) using the network. + + Where possible, test that the output of urlopen_text matches that of + urlopen. + """ + + def urlopen_bytes(self, *args): + return _open_with_retry(urllib.request.urlopen, *args) + + def urlopen(self, *args): + return _open_with_retry(urllib.request.urlopen_text, *args) + + def test_basic(self): + # Simple test expected to pass. + open_url = self.urlopen("http://www.python.org/") + for attr in ("read", "readline", "readlines", "fileno", "close", + "info", "geturl"): + self.assert_(hasattr(open_url, attr), "object returned from " + "urlopen lacks the %s attribute" % attr) + try: + self.assert_(open_url.read(), "calling 'read' failed") + finally: + open_url.close() + open_url = self.urlopen("http://www.python.org/") + sock = open_url.fp + self.assert_(not sock.closed) + open_url.close() + self.assert_(sock.closed) + + def test_readlines(self): + # Test both readline and readlines. + open_url = self.urlopen("http://www.python.org/") + try: + text_line = open_url.readline() + text_lines = open_url.readlines() + self.assert_(isinstance(text_line, str), + "readline did not return string") + self.assert_(isinstance(text_lines, list), + "readlines did not return a list") + finally: + open_url.close() + + + def test_readlines_against_urlopen(self): + # Test both readline and readlines. + open_url = self.urlopen("http://www.python.org/") + enc = open_url.charset + open_url_bytes = self.urlopen_bytes("http://www.python.org/") + try: + text_line = open_url.readline() + text_lines = open_url.readlines() + + bytes_line = open_url_bytes.readline() + bytes_lines = open_url_bytes.readlines() + + encoded_line = str(bytes_line, encoding=enc) + self.assertEqual(text_line, encoded_line) + + self.assertEqual(len(text_lines), len(bytes_lines)) + encoded_lines = [str(line, encoding=enc) for line in bytes_lines] + self.assertEqual(text_lines, encoded_lines) + + finally: + open_url.close() + open_url_bytes.close() + + def test_compatibility(self): + # Test both readline and readlines. + open_url = self.urlopen("http://www.python.org/") + enc = open_url.charset + open_url_bytes = self.urlopen_bytes("http://www.python.org/") + try: + text_lines = open_url.readlines(10) + text = open_url.read(500) + + bytes_lines = open_url_bytes.readlines(10) + bytes_ = open_url_bytes.read(500) + + encoded = str(bytes_, encoding=enc) + self.assertEqual(text, encoded) + + self.assertEqual(len(text_lines), len(bytes_lines)) + encoded_lines = [str(line, encoding=enc) for line in bytes_lines] + self.assertEqual(text_lines, encoded_lines) + finally: + open_url.close() + open_url_bytes.close() + + def test_info(self): + # Test 'info'. + open_url = self.urlopen("http://www.python.org/") + try: + info_obj = open_url.info() + finally: + encoding = open_url.charset + open_url.close() + self.assert_(isinstance(info_obj, email.message.Message), + "object returned by 'info' is not an instance of " + "email.message.Message") + self.assertEqual(info_obj.get_content_subtype(), "html") + self.assertEqual(info_obj.get_content_charset(), encoding) + + def test_geturl(self): + # Make sure same URL as opened is returned by geturl. + URL = "http://www.python.org/" + open_url = self.urlopen(URL) + try: + gotten_url = open_url.geturl() + finally: + open_url.close() + self.assertEqual(gotten_url, URL) + + def test_getcode(self): + # test getcode() with the fancy opener to get 404 error codes + URL = "http://www.python.org/XXXinvalidXXX" + open_url = urllib.request.FancyURLopener().open(URL) + try: + code = open_url.getcode() + finally: + open_url.close() + self.assertEqual(code, 404) + + def test_fileno(self): + if sys.platform in ('win32',): + # On Windows, socket handles are not file descriptors; this + # test can't pass on Windows. + return + # Make sure fd returned by fileno is valid. + open_url = self.urlopen("http://www.python.org/") + fd = open_url.fileno() + FILE = os.fdopen(fd, encoding='utf-8') + try: + self.assert_(FILE.read(), "reading from file created using fd " + "returned by fileno failed") + finally: + FILE.close() + + def test_bad_address(self): + # Make sure proper exception is raised when connecting to a bogus + # address. + self.assertRaises(IOError, + # SF patch 809915: In Sep 2003, VeriSign started + # highjacking invalid .com and .net addresses to + # boost traffic to their own site. This test + # started failing then. One hopes the .invalid + # domain will be spared to serve its defined + # purpose. + # urllib.urlopen, "http://www.sadflkjsasadf.com/") + urllib.request.urlopen, + "http://sadflkjsasf.i.nvali.d/") + + class urlretrieveNetworkTests(unittest.TestCase): """Tests urllib.request.urlretrieve using the network.""" @@ -186,6 +339,7 @@ support.requires('network') support.run_unittest(URLTimeoutTest, urlopenNetworkTests, + urlopen_textNetworkTests, urlretrieveNetworkTests) if __name__ == "__main__":