diff --git a/Lib/select.py b/Lib/select.py new file mode 100644 --- /dev/null +++ b/Lib/select.py @@ -0,0 +1,313 @@ +"""Select module. + +This module supports asynchronous I/O on multiple file descriptors. +""" + + +from abc import ABCMeta, abstractmethod +import _select +from _select import * +import threading + + +# genereric events, that must be mapped to implementation-specific ones +SELECT_IN = (1 << 0) +SELECT_OUT = (1 << 1) +SELECT_ERR = (1 << 2) + + +class _AbstractSelector(metaclass=ABCMeta): + """Abstract base selector class. + + A selector supports registering and unregistering file descriptors, and then + polling them for I/O events. A selector can use various implementations + (select(), poll(), epoll()...) depending on the platform. The default + `Selector` class uses the most performant implementation on the current + platform. + + Derived classes must implement implementation-specific abstract methods. + + Notes: + - A Selector accepts both file descriptors and objects with a `fileno()` + method. The mapping between file objects and the corresponding file + descriptor is done by the public methods. + - The conversion between generic events and implementation-specific ones is + performed by implementation-specific methods (mainly because select() + interface is really different from all other implementations). + - The public methods perform the necessary synchronization, so derived + classes don't have to worry about it. + """ + + def __init__(self): + # this maps file-descriptor to file object + self._fds_map = {} + # this lock guards the selector against concurrent access + self._lock = threading.Lock() + + def register(self, fileobj, eventmask): + """Register a file object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + eventmask -- bitwise mask of events to monitor + """ + # find the corresponding FD + fd = self._file_to_fd(fileobj) + with self._lock: + # store it, and call implementation-specific register + self._fds_map[fd] = fileobj + self._register(fd, eventmask) + + @abstractmethod + def _register(self, fd, eventmask): + pass + + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + """ + # find the corresponding FD + fd = self._file_to_fd(fileobj) + with self._lock: + # check it's registered, and call implementation-specific unregister + try: + del self._fds_map[fd] + except KeyError: + raise ValueError("Cannot unregister non registered file: " + "{!r}".format(fileobj)) + self._unregister(fd) + + @abstractmethod + def _unregister(self, fd): + pass + + def modify(self, fileobj, eventmask): + """Change the events to monitor for a given file object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + """ + # find the corresponding FD + fd = self._file_to_fd(fileobj) + with self._lock: + # check it's registered, and call implementation-specific modify + if not fd in self._fds_map: + raise ValueError("Cannot modify non registered file: " + "{!r}".format(fileobj)) + self._modify(fd, eventmask) + + @abstractmethod + def _modify(self, fd, eventmask): + pass + + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout == 0, the select() call won't block, and will + report the currently ready files + if timeout is None, select() will block until a monitored + file descriptor becomes ready + + Returns: + set of (fileobj, event mask) for ready file object + """ + if not (timeout is None or timeout >= 0): + raise ValueError("timeout must be None or non-negative: " + "{!r}".format(timeout)) + + with self._lock: + # call implementation-specific select + ready = self._select(timeout) + # and convert FD to file objects + return set((self._fds_map.get(fd, fd), evt) for fd, evt in ready) + + @abstractmethod + def _select(self, timeout): + pass + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + with self._lock: + del self._fds_map + self._close() + + @abstractmethod + def _close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + @staticmethod + def _file_to_fd(fileobj): + """Helper method that returns the file descriptor for a given file + object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + + Returns: + corresponding FD + """ + if isinstance(fileobj, int): + fd = fileobj + elif hasattr(fileobj, 'fileno'): + fd = fileobj.fileno() + else: + raise ValueError("Invalid file object: {!r}".format(fileobj)) + return fd + + +def _map_events(mapping, mask): + """Helper method to convert between generic and specific events masks.""" + new_mask = 0 + for source, target in mapping.items(): + if source & mask: + new_mask |= target + return new_mask + + +class SelectSelector(_AbstractSelector): + """Select-based selector.""" + + def __init__(self): + _AbstractSelector.__init__(self) + self._rset = set() + self._wset = set() + self._eset = set() + + def _register(self, fd, eventmask): + if eventmask & SELECT_IN: + self._rset.add(fd) + if eventmask & SELECT_OUT: + self._wset.add(fd) + if eventmask & SELECT_ERR: + self._eset.add(fd) + + def _unregister(self, fd): + self._rset.discard(fd) + self._wset.discard(fd) + self._eset.discard(fd) + + def _modify(self, fd, eventmask): + self._unregister(fd) + self._register(fd, eventmask) + + def _select(self, timeout=None): + rset, wset, eset = select(self._rset, self._wset, self._eset, timeout) + + events = set() + for fd in rset: + events.add((fd, SELECT_IN)) + for fd in wset: + events.add((fd, SELECT_OUT)) + for fd in eset: + events.add((fd, SELECT_ERR)) + return events + + def _close(self): + pass + + +if hasattr(_select, 'poll'): + + class PollSelector(_AbstractSelector): + """Poll-based selector.""" + + _generic_to_specific_events = {SELECT_IN: POLLIN|POLLPRI, + SELECT_OUT: POLLOUT + } + + _specific_to_generic_events = {POLLIN: SELECT_IN, + POLLPRI: SELECT_IN, + POLLOUT: SELECT_OUT, + POLLHUP: SELECT_IN, + POLLNVAL: SELECT_ERR, + POLLERR: SELECT_ERR, + } + + def __init__(self): + _AbstractSelector.__init__(self) + self._poll = _select.poll() + + def _register(self, fd, eventmask): + eventmask = _map_events(self._generic_to_specific_events, eventmask) + self._poll.register(fd, eventmask) + + def _unregister(self, fd): + self._poll.unregister(fd) + + def _modify(self, fd, eventmask): + eventmask = _map_events(self._generic_to_specific_events, eventmask) + self._poll.modify(fd, eventmask) + + def _select(self, timeout=None): + events = self._poll.poll(None if timeout is None else 1000 * timeout) + return ((fd, _map_events(self._specific_to_generic_events, evt)) + for fd, evt in events) + + def _close(self): + pass + + +if hasattr(_select, 'epoll'): + + class EpollSelector(_AbstractSelector): + """Epoll-based selector.""" + + _generic_to_specific_events = {SELECT_IN: EPOLLIN|EPOLLPRI, + SELECT_OUT: EPOLLOUT + } + + _specific_to_generic_events = {EPOLLIN: SELECT_IN, + EPOLLPRI: SELECT_IN, + EPOLLOUT: SELECT_OUT, + EPOLLHUP: SELECT_IN, + EPOLLERR: SELECT_ERR, + } + + def __init__(self): + _AbstractSelector.__init__(self) + self._epoll = _select.epoll() + + def _register(self, fd, eventmask): + eventmask = _map_events(self._generic_to_specific_events, eventmask) + self._epoll.register(fd, eventmask) + + def _unregister(self, fd): + self._epoll.unregister(fd) + + def _modify(self, fd, eventmask): + eventmask = _map_events(self._generic_to_specific_events, eventmask) + self._epoll.modify(fd, eventmask) + + def _select(self, timeout): + events = self._epoll.poll(-1 if timeout is None else timeout) + return ((fd, _map_events(self._specific_to_generic_events, evt)) + for fd, evt in events) + + def _close(self): + self._epoll.close() + + +if hasattr(_select, 'epoll'): + # complexity is O() + Selector = EpollSelector +elif hasattr(_select, 'poll'): + # complexity is O() + Selector = PollSelector +else: + # complexity is O(max()) + Selector = SelectSelector diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py --- a/Lib/test/test_select.py +++ b/Lib/test/test_select.py @@ -1,10 +1,20 @@ import errno import os +import random import select import sys import unittest from test import support + +def find_ready_matching(ready, flag): + match = [] + for fd, mode in ready: + if mode & flag: + match.append(fd) + return match + + @unittest.skipIf((sys.platform[:3]=='win'), "can't easily test on this system") class SelectTestCase(unittest.TestCase): @@ -75,9 +85,149 @@ a[:] = [F()] * 10 self.assertEqual(select.select([], a, []), ([], a[:5], [])) + +@unittest.skipIf((sys.platform[:3]=='win'), + "can't easily test on this system") +class BasicSelectorTestCase(unittest.TestCase): + + def test_constants(self): + select.SELECT_IN + select.SELECT_OUT + select.SELECT_ERR + + +class BaseSelectorTestCase(unittest.TestCase): + + def test_error_conditions(self): + s = self.SELECTOR() + self.assertRaises(TypeError, s.register) + self.assertRaises(TypeError, s.register, 0) + self.assertRaises(TypeError, s.register, 0, 1, 2) + self.assertRaises(TypeError, s.unregister, 0, 1) + self.assertRaises(TypeError, s.modify, 0) + self.assertRaises(TypeError, s.select, 0, 1) + self.assertRaises(ValueError, s.select, -1) + + def test_basic(self): + with self.SELECTOR() as s: + rd, wr = os.pipe() + self.addCleanup(os.close, rd) + self.addCleanup(os.close, wr) + + s.register(rd, select.SELECT_IN) + self.assertFalse(s.select(0.1)) + + s.register(wr, select.SELECT_OUT) + self.assertEqual(set(((wr, select.SELECT_OUT),)), s.select()) + + s.unregister(rd) + s.unregister(wr) + + def test_selector(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + NUM_PIPES = 12 + MSG = b" This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_PIPES): + rd, wr = os.pipe() + s.register(rd, select.SELECT_IN) + s.register(wr, select.SELECT_OUT) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + bufs = [] + + while writers: + ready = s.select() + ready_writers = find_ready_matching(ready, select.SELECT_OUT) + if not ready_writers: + self.fail("no pipes ready for writing") + wr = random.choice(ready_writers) + os.write(wr, MSG) + + ready = s.select() + ready_readers = find_ready_matching(ready, select.SELECT_IN) + if not ready_readers: + self.fail("no pipes ready for reading") + self.assertEqual([w2r[wr]], ready_readers) + rd = ready_readers[0] + buf = os.read(rd, MSG_LEN) + self.assertEqual(len(buf), MSG_LEN) + bufs.append(buf) + os.close(r2w[rd]) ; os.close(rd) + s.unregister(r2w[rd]) + s.unregister(rd) + writers.remove(r2w[rd]) + + self.assertEqual(bufs, [MSG] * NUM_PIPES) + + def test_timeout(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' + p = os.popen(cmd, 'r') + s.register(p, select.SELECT_IN) + + for tout in (0, 1, 2, 4, 8, 16) + (None,)*10: + if support.verbose: + print('timeout =', tout) + + ready = s.select(tout) + if not ready: + continue + if ready == set(((p, select.SELECT_IN),)): + line = p.readline() + if support.verbose: + print(repr(line)) + if not line: + if support.verbose: + print('EOF') + break + continue + self.fail('Unexpected return values from select(): %r' % ready) + p.close() + + +class SelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.Selector + + +class SelectSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.SelectSelector + + +@unittest.skipUnless(hasattr(select, 'poll'), "Test needs select.poll()") +class PollSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.PollSelector + + +@unittest.skipUnless(hasattr(select, 'epoll'), "Test needs select.epoll()") +class EpollSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.EpollSelector + + def test_main(): - support.run_unittest(SelectTestCase) + tests = [SelectTestCase] + tests.extend([BasicSelectorTestCase, SelectorTestCase, + SelectSelectorTestCase, PollSelectorTestCase, + EpollSelectorTestCase]) + support.run_unittest(*tests) support.reap_children() + if __name__ == "__main__": test_main() diff --git a/Modules/selectmodule.c b/Modules/selectmodule.c --- a/Modules/selectmodule.c +++ b/Modules/selectmodule.c @@ -2129,7 +2129,7 @@ static struct PyModuleDef selectmodule = { PyModuleDef_HEAD_INIT, - "select", + "_select", module_doc, -1, select_methods, @@ -2143,7 +2143,7 @@ PyMODINIT_FUNC -PyInit_select(void) +PyInit__select(void) { PyObject *m; m = PyModule_Create(&selectmodule); diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -623,7 +623,7 @@ missing.append('spwd') # select(2); not on ancient System V - exts.append( Extension('select', ['selectmodule.c']) ) + exts.append( Extension('_select', ['selectmodule.c']) ) # Fred Drake's interface to the Python parser exts.append( Extension('parser', ['parsermodule.c']) )