Index: Modules/_multiprocessing/pipe_connection.c =================================================================== --- Modules/_multiprocessing/pipe_connection.c (revision 70789) +++ Modules/_multiprocessing/pipe_connection.c (working copy) @@ -17,11 +17,28 @@ static Py_ssize_t conn_send_string(ConnectionObject *conn, char *string, size_t length) { + BOOL ret = TRUE; DWORD amount_written; - BOOL ret; + Py_ssize_t offset = 0; Py_BEGIN_ALLOW_THREADS - ret = WriteFile(conn->handle, string, length, &amount_written, NULL); + if (length == 0) + ret = WriteFile(conn->handle, string, 0, &amount_written, NULL); + else { + while (ret && offset < length) { + DWORD len_to_write = MIN(length - offset, 32 * 1024); + + ret = WriteFile(conn->handle, string + offset, len_to_write, &amount_written, NULL); + offset += amount_written; + + if (offset < length) { + // ERROR_NO_SYSTEM_RESOURCES occurs when OS internal buffer is full + // and can't be sent to other process immediately + if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) + ret = TRUE; + } + } + } Py_END_ALLOW_THREADS return ret ? MP_SUCCESS : MP_STANDARD_ERROR; } Index: Lib/test/test_multiprocessing.py =================================================================== --- Lib/test/test_multiprocessing.py (revision 70789) +++ Lib/test/test_multiprocessing.py (working copy) @@ -1228,6 +1228,19 @@ ALLOWED_TYPES = ('processes', 'threads') + + def _read_until_sentinel(self, conn): + recv_list = [] + for msg in iter(conn.recv_bytes, SENTINEL): + recv_list.append(msg) + return latin('').join(recv_list) + + def _echo_large(self, conn): + msg = self._read_until_sentinel(conn) + conn.send_bytes(msg) + conn.send_bytes(SENTINEL) + conn.close() + def _echo(self, conn): for msg in iter(conn.recv_bytes, SENTINEL): conn.send_bytes(msg) @@ -1291,15 +1304,11 @@ self.assertEqual(poll(TIMEOUT1), True) self.assertTimingAlmostEqual(poll.elapsed, 0) + conn.send_bytes(SENTINEL) # tell child to quit self.assertEqual(conn.recv(), None) - really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb - conn.send_bytes(really_big_msg) - self.assertEqual(conn.recv_bytes(), really_big_msg) - - conn.send_bytes(SENTINEL) # tell child to quit child_conn.close() - + if self.TYPE == 'processes': self.assertEqual(conn.readable, True) self.assertEqual(conn.writable, True) @@ -1307,7 +1316,26 @@ self.assertRaises(EOFError, conn.recv_bytes) p.join() + + def test_large_msg(self): + + conn, child_conn = self.Pipe() + p = self.Process(target=self._echo_large, args=(child_conn,)) + p.daemon = True + p.start() + + really_big_msg = latin('X') * (1024 * 1024) # * 16) # 128Mb + conn.send_bytes(really_big_msg) + conn.send_bytes(SENTINEL) # tell child to quit + rcvd = self._read_until_sentinel(conn) + self.assertEqual(rcvd, really_big_msg) + + child_conn.close() + + p.join() + + def test_duplex_false(self): reader, writer = self.Pipe(duplex=False) self.assertEqual(writer.send(1), None)