import unittest import time import multiprocessing as mp class TestPoll(unittest.TestCase): def test_empty_string(self): a, b = mp.Pipe() self.assertEqual(a.poll(), False) b.send_bytes(b'') self.assertEqual(a.poll(), True) self.assertEqual(a.poll(), True) @classmethod def _child_strings(cls, conn, strings): for s in strings: time.sleep(0.1) conn.send_bytes(s) conn.close() def test_strings(self): strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') a, b = mp.Pipe() p = mp.Process(target=self._child_strings, args=(b, strings)) p.start() for s in strings: for i in range(200): if a.poll(0.01): break self.assertEqual(s, a.recv_bytes()) p.join() @classmethod def _child_dont_merge(cls, b): b.send_bytes(b'a') b.send_bytes(b'b') b.send_bytes(b'cd') def test_dont_merge(self): a, b = mp.Pipe() self.assertEqual(a.poll(0.0), False) self.assertEqual(a.poll(0.1), False) p = mp.Process(target=self._child_dont_merge, args=(b,)) p.start() self.assertEqual(a.recv_bytes(), b'a') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.recv_bytes(), b'b') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(0.0), True) self.assertEqual(a.recv_bytes(), b'cd') p.join() if __name__ == '__main__': unittest.main()