diff --git a/Lib/codecs.py b/Lib/codecs.py index a70ed20f2b..9fa4dd7926 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -9,6 +9,7 @@ Written by Marc-Andre Lemburg (mal@lemburg.com). import builtins import sys +import re ### Registry and builtin stateless codec functions @@ -416,6 +417,46 @@ class StreamWriter(Codec): ### + +def _str_splitlines(s, keepends=False, + pat=re.compile(r'(\r\n|\r|\n)'), + ends_pat=re.compile(r'.*?(\r\n|\r|\n)'), + ): + if keepends: + lines = [] + matches = list(ends_pat.finditer(s)) + if matches: + lines.extend(m.group() for m in matches) + last = matches[-1] + if last.end() != len(s): + lines.append(s[last.end():]) + else: + lines = pat.split(s) + if lines: + lines[-1] = lines[-1].rstrip('\r\n') + return lines + + +def _bytes_splitlines(s, keepends=False, + pat=re.compile(rb'\r?\n'), + ends_pat=re.compile(rb'.*?\r?\n'), + ): + if keepends: + lines = [] + matches = list(ends_pat.finditer(s)) + if matches: + lines.extend(m.group() for m in matches) + last = matches[-1] + if last.end() != len(s): + lines.append(s[last.end():]) + else: + lines = pat.split(s) + if lines: + lines[-1] = lines[-1].rstrip(b'\r') + return lines + + + class StreamReader(Codec): charbuffertype = str @@ -442,6 +483,10 @@ class StreamReader(Codec): self.errors = errors self.bytebuffer = b"" self._empty_charbuffer = self.charbuffertype() + if isinstance(self._empty_charbuffer, str): + self._splitlines = _str_splitlines + else: + self._splitlines = _bytes_splitlines self.charbuffer = self._empty_charbuffer self.linebuffer = None @@ -506,7 +551,7 @@ class StreamReader(Codec): if firstline: newchars, decodedbytes = \ self.decode(data[:exc.start], self.errors) - lines = newchars.splitlines(keepends=True) + lines = self._splitlines(newchars, keepends=True) if len(lines)<=1: raise else: @@ -548,7 +593,7 @@ class StreamReader(Codec): self.charbuffer = self.linebuffer[0] self.linebuffer = None if not keepends: - line = line.splitlines(keepends=False)[0] + line = self._splitlines(line, keepends=False)[0] return line readsize = size or 72 @@ -565,7 +610,7 @@ class StreamReader(Codec): data += self.read(size=1, chars=1) line += data - lines = line.splitlines(keepends=True) + lines = self._splitlines(line, keepends=True) if lines: if len(lines) > 1: # More than one line result; the first line is a full line @@ -581,10 +626,10 @@ class StreamReader(Codec): # only one remaining line, put it back into charbuffer self.charbuffer = lines[0] + self.charbuffer if not keepends: - line = line.splitlines(keepends=False)[0] + line = self._splitlines(line, keepends=False)[0] break line0withend = lines[0] - line0withoutend = lines[0].splitlines(keepends=False)[0] + line0withoutend = self._splitlines(lines[0], keepends=False)[0] if line0withend != line0withoutend: # We really have a line end # Put the rest back together and keep it until the next call self.charbuffer = self._empty_charbuffer.join(lines[1:]) + \ @@ -597,7 +642,7 @@ class StreamReader(Codec): # we didn't get anything or this was our only try if not data or size is not None: if line and not keepends: - line = line.splitlines(keepends=False)[0] + line = self._splitlines(line, keepends=False)[0] break if readsize < 8000: readsize *= 2 @@ -616,7 +661,7 @@ class StreamReader(Codec): """ data = self.read() - return data.splitlines(keepends) + return self._splitlines(data, keepends) def reset(self): @@ -819,7 +864,7 @@ class StreamRecoder: data = self.reader.read() data, bytesencoded = self.encode(data, self.errors) - return data.splitlines(keepends=True) + return self._splitlines(data, keepends=True) def __next__(self): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 00b5d317c4..b5f43fe173 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -143,14 +143,14 @@ class ReadTest(MixInCheckStateHandling): return "|".join(lines) s = "foo\nbar\r\nbaz\rspam\u2028eggs" - sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs" - sexpectednoends = "foo|bar|baz|spam|eggs" + sexpected = "foo\n|bar\r\n|baz\r|spam\u2028eggs" + sexpectednoends = "foo|bar|baz|spam\u2028eggs" self.assertEqual(readalllines(s, True), sexpected) self.assertEqual(readalllines(s, False), sexpectednoends) self.assertEqual(readalllines(s, True, 10), sexpected) self.assertEqual(readalllines(s, False, 10), sexpectednoends) - lineends = ("\n", "\r\n", "\r", "\u2028") + lineends = ("\n", "\r\n", "\r") # Test long lines (multiple calls to read() in readline()) vw = [] vwo = []