import _winapi class Iocp: def __init__(self, concurrency=0): self._iocp = _winapi.CreateIoCompletionPort( _winapi.INVALID_HANDLE_VALUE, _winapi.NULL, 0, concurrency) def register(self, handle, key=0): _winapi.CreateIoCompletionPort(handle, self._iocp, key, 0) def get(self, timeout=None): if timeout is None: ms = _winapi.INFINITE elif timeout < 0: raise ValueError('negative timeout') else: ms = int(timeout * 1000 + 0.5) if ms >= _winapi.INFINITE: raise ValueError('timeout too big') return _winapi.GetQueuedCompletionStatus(self._iocp, ms) def post(self, bytes_transferred, key, overlapped_address): _winapi.PostQueuedCompletionStatus( self._iocp, bytes_transferred, key, overlapped_address) def close(self, *, _winapi=_winapi): if self._iocp is not None: _winapi.CloseHandle(self._iocp) self._iocp = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() __del__ = close if __name__ == '__main__': import multiprocessing as mp, threading, time MSG = b"hello" a, b = mp.Pipe() g, h = a.fileno(), b.fileno() def writer(): global wov time.sleep(1) wov, err = _winapi.WriteFile(h, MSG, True) iocp.post(3, 4, 5) with Iocp() as iocp: iocp.register(g, 1) iocp.register(h, 2) assert iocp.get(timeout=0.0) is None rov, err = _winapi.ReadFile(g, len(MSG)*2, True) threading.Thread(target=writer).start() results = set(iocp.get() for i in range(3)) assert results == set([ (len(MSG), 1, rov.address), (len(MSG), 2, wov.address), (3, 4, 5), ]) assert wov.GetOverlappedResult(False) == (len(MSG), 0) assert rov.GetOverlappedResult(False) == (len(MSG), 0) print(rov.getbuffer())