diff --git a/Lib/select.py b/Lib/select.py new file mode 100644 --- /dev/null +++ b/Lib/select.py @@ -0,0 +1,347 @@ +"""Select module. + +This module supports asynchronous I/O on multiple file descriptors. +""" + + +from _select import * + + +# generic events, that must be mapped to implementation-specific ones +# read event +SELECT_IN = (1 << 0) +# write event +SELECT_OUT = (1 << 1) +# connect event +SELECT_CONNECT = SELECT_OUT + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file descriptor, or any object with a `fileno()` method + + Returns: + corresponding file descriptor + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (ValueError, TypeError): + raise ValueError("Invalid file object: {!r}".format(fileobj)) + return fd + + +class _Key: + """Object used internally to associate a file object to its backing file + descriptor, selected event mask and attached data.""" + + def __init__(self, fileobj, events, data=None): + self.fileobj = fileobj + self.fd = _fileobj_to_fd(fileobj) + self.events = events + self.data = data + + +class _BaseSelector: + """Base selector class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + performant implementation on the current platform. + """ + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + # this maps file objects to keys - for fast (un)registering + self._fileobj_to_key = {} + + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object + events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT) + data -- attached data + """ + if (not events) or (events & ~(SELECT_IN|SELECT_OUT)): + raise ValueError("Invalid events: {}".format(events)) + + if fileobj in self._fileobj_to_key: + raise ValueError("{!r} is already registered".format(fileobj)) + + key = _Key(fileobj, events, data) + self._fd_to_key[key.fd] = key + self._fileobj_to_key[fileobj] = key + return key + + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object + """ + try: + key = self._fileobj_to_key[fileobj] + del self._fd_to_key[key.fd] + del self._fileobj_to_key[fileobj] + except KeyError: + raise ValueError("{!r} is not registered".format(fileobj)) + return key + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object + events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT) + data -- attached data + """ + self.unregister(fileobj) + self.register(fileobj, events, data) + + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout == 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (fileobj, events, attached data) for ready file objects + `events` is a bitwise mask of SELECT_IN|SELECT_OUT + """ + raise NotImplementedError() + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + self._fd_to_key.clear() + self._fileobj_to_key.clear() + + def get_info(self, fileobj): + """Return information about a registered file object. + + Returns: + (events, data) associated to this file object + + Raises KeyError if the file object is not registered. + """ + try: + key = self._fileobj_to_key[fileobj] + except KeyError: + raise KeyError("{} is not registered".format(fileobj)) + return key.events, key.data + + def registered_count(self): + """Return the number of registered file objects. + + Returns: + number of currently registered file objects + """ + return len(self._fd_to_key) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key + """ + try: + return self._fd_to_key[fd] + except KeyError: + raise RuntimeError("No key found for fd {}".format(fd)) + + +class SelectSelector(_BaseSelector): + """Select-based selector.""" + + def __init__(self): + super().__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & SELECT_IN: + self._readers.add(key.fd) + if events & SELECT_OUT: + self._writers.add(key.fd) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + + def select(self, timeout=None): + r, w, _ = select(self._readers, self._writers, [], timeout) + r = set(r) + w = set(w) + ready = [] + for fd in r | w: + events = 0 + if fd in r: + events |= SELECT_IN + if fd in w: + events |= SELECT_OUT + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + +if 'poll' in globals(): + + class PollSelector(_BaseSelector): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & SELECT_IN: + poll_events |= POLLIN + if events & SELECT_OUT: + poll_events |= POLLOUT + self._poll.register(key.fd, poll_events) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + + def select(self, timeout=None): + timeout = None if timeout is None else int(1000 * timeout) + ready = [] + for fd, event in self._poll.poll(timeout): + events = 0 + if event & ~POLLIN: + events |= SELECT_OUT + if event & ~POLLOUT: + events |= SELECT_IN + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + +if 'epoll' in globals(): + + class EpollSelector(_BaseSelector): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = epoll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & SELECT_IN: + epoll_events |= EPOLLIN + if events & SELECT_OUT: + epoll_events |= EPOLLOUT + self._epoll.register(key.fd, epoll_events) + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._epoll.unregister(key.fd) + + def select(self, timeout=None): + timeout = -1 if timeout is None else timeout + max_ev = self.registered_count() + ready = [] + for fd, event in self._epoll.poll(timeout, max_ev): + events = 0 + if event & ~EPOLLIN: + events |= SELECT_OUT + if event & ~EPOLLOUT: + events |= SELECT_IN + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + def close(self): + super().close() + self._epoll.close() + + +if 'kqueue' in globals(): + + class KqueueSelector(_BaseSelector): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = kqueue() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & SELECT_IN: + kev = kevent(fd, KQ_FILTER_READ, KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & SELECT_OUT: + kev = kevent(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + + def select(self, timeout=None): + max_ev = self.registered_count() + ready = [] + for kev in self._kqueue.control(None, max_ev, timeout): + fd = kev.ident + flag = kev.filter + events = 0 + if flag == KQ_FILTER_READ: + events |= SELECT_IN + if flag == KQ_FILTER_WRITE: + events |= SELECT_OUT + + key = self._key_from_fd(fd) + ready.append((key.fileobj, events, key.data)) + return ready + + def close(self): + super().close() + self._kqueue.close() + + +# Choose the best implementation: roughly, epoll|kqueue > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + Selector = KqueueSelector +elif 'EpollSelector' in globals(): + Selector = EpollSelector +elif 'PollSelector' in globals(): + Selector = PollSelector +else: + Selector = SelectSelector diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py --- a/Lib/test/test_select.py +++ b/Lib/test/test_select.py @@ -1,10 +1,25 @@ import errno import os +import random import select import sys import unittest from test import support +try: + import resource +except ImportError: + resource = None + + +def find_ready_matching(ready, flag): + match = [] + for fd, mode, data in ready: + if mode & flag: + match.append(fd) + return match + + @unittest.skipIf((sys.platform[:3]=='win'), "can't easily test on this system") class SelectTestCase(unittest.TestCase): @@ -75,9 +90,260 @@ a[:] = [F()] * 10 self.assertEqual(select.select([], a, []), ([], a[:5], [])) + +class BasicSelectorTestCase(unittest.TestCase): + + def test_constants(self): + select.SELECT_IN + select.SELECT_OUT + + +class BaseSelectorTestCase(unittest.TestCase): + + def test_error_conditions(self): + s = self.SELECTOR() + self.assertRaises(TypeError, s.register) + self.assertRaises(TypeError, s.register, 0) + self.assertRaises(ValueError, s.register, 0, 18) + self.assertRaises(TypeError, s.unregister, 0, 1) + self.assertRaises(TypeError, s.modify, 0) + self.assertRaises(TypeError, s.select, 0, 1) + + def test_basic(self): + with self.SELECTOR() as s: + rd, wr = os.pipe() + wro = os.fdopen(os.dup(wr), "wb") + self.addCleanup(os.close, rd) + self.addCleanup(os.close, wr) + self.addCleanup(wro.close) + + # test without attached data + s.register(wr, select.SELECT_OUT) + self.assertEqual(set(((wr, select.SELECT_OUT, None),)), set(s.select())) + + # test with attached data + s.unregister(wr) + s.register(wr, select.SELECT_OUT, sys.stdin) + self.assertEqual(set(((wr, select.SELECT_OUT, sys.stdin),)), set(s.select())) + + # test with file object + s.register(wro, select.SELECT_OUT) + self.assertEqual(set(((wro, select.SELECT_OUT, None), + (wr, select.SELECT_OUT, sys.stdin))), set(s.select())) + s.unregister(wro) + + # modify + s.modify(wr, select.SELECT_OUT, sys.stdout) + self.assertEqual(set(((wr, select.SELECT_OUT, sys.stdout),)), set(s.select())) + + # test timeout + s.register(rd, select.SELECT_IN, sys.stdin) + s.unregister(wr) + self.assertFalse(s.select(0.1)) + s.register(wr, select.SELECT_OUT) + self.assertEqual(set(((wr, select.SELECT_OUT, None),)), + set(s.select(0.1))) + + # registering twice should raise an error + self.assertRaises(ValueError, s.register, wr, select.SELECT_OUT) + + # test get_info() + self.assertEquals((select.SELECT_OUT, None), s.get_info(wr)) + self.assertEquals((select.SELECT_IN, sys.stdin), s.get_info(rd)) + self.assertRaises(KeyError, s.get_info, wro) + + s.unregister(rd) + s.unregister(wr) + + # test registered_count() + self.assertEquals(0, s.registered_count()) + s.register(rd, select.SELECT_IN) + s.register(wr, select.SELECT_OUT) + self.assertEquals(2, s.registered_count()) + s.modify(wr, select.SELECT_OUT, sys.stdout) + self.assertEquals(2, s.registered_count()) + s.unregister(rd) + s.unregister(wr) + self.assertEquals(0, s.registered_count()) + + # unregistering twice should raise an error + self.assertRaises(ValueError, s.unregister, wr) + + def test_selector(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + NUM_PIPES = 12 + MSG = b" This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_PIPES): + rd, wr = os.pipe() + s.register(rd, select.SELECT_IN) + s.register(wr, select.SELECT_OUT) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + bufs = [] + + while writers: + ready = s.select() + ready_writers = find_ready_matching(ready, select.SELECT_OUT) + if not ready_writers: + self.fail("no pipes ready for writing") + wr = random.choice(ready_writers) + os.write(wr, MSG) + + ready = s.select() + ready_readers = find_ready_matching(ready, select.SELECT_IN) + if not ready_readers: + self.fail("no pipes ready for reading") + self.assertEqual([w2r[wr]], ready_readers) + rd = ready_readers[0] + buf = os.read(rd, MSG_LEN) + self.assertEqual(len(buf), MSG_LEN) + bufs.append(buf) + os.close(r2w[rd]) ; os.close(rd) + s.unregister(r2w[rd]) + s.unregister(rd) + writers.remove(r2w[rd]) + + self.assertEqual(bufs, [MSG] * NUM_PIPES) + + def test_timeout(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' + p = os.popen(cmd, 'r') + s.register(p.fileno(), select.SELECT_IN, p) + + for tout in (0, 1, 2, 4, 8, 16) + (None,)*10: + if support.verbose: + print('timeout =', tout) + + ready = s.select(tout) + if not ready: + continue + eof_seen = False + for fileobj, evt, data in ready: + if fileobj == p.fileno() and evt & select.SELECT_IN: + while True: + line = p.readline() + if support.verbose: + print(repr(line)) + if not line: + if support.verbose: + print('EOF') + eof_seen = True + break + if not eof_seen: + self.fail('Unexpected return values from select(): %r' % ready) + p.close() + + def test_below_fd_setsize(self): + # No implementation should have a problem with less than FD_SETSIZE file + # descriptors. To be conservative, let's say 64. + NUM_FDS = 64 + + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + with self.SELECTOR() as s: + for i in range(NUM_FDS): + fd = os.dup(w) + self.addCleanup(os.close, fd) + + s.register(fd, select.SELECT_OUT) + self.assertEquals(NUM_FDS, len(s.select())) + + +class ScalableSelectorTestCase: + + # a mixin to test selector scalability + + @unittest.skipUnless(resource, "Test needs resource module") + def test_above_fd_setsize(self): + # A scalable implementation should have no problem with more than + # FD_SETSIZE file descriptors. Since we don't know the value, we just + # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling. + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) + self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, + (soft, hard)) + NUM_FDS = hard + except OSError: + NUM_FDS = soft + + # guard for already allocated FDs (stdin, stdout...) + NUM_FDS -= 16 + + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + with self.SELECTOR() as s: + for i in range(NUM_FDS): + try: + fd = os.dup(w) + except OSError as e: + if e.errno == errno.EMFILE: + # too many FD, skip + self.skipTest("FD limit reached") + raise + + self.addCleanup(os.close, fd) + s.register(fd, select.SELECT_OUT) + + self.assertEquals(NUM_FDS, len(s.select())) + + +@unittest.skipIf((sys.platform[:3]=='win'), + "can't easily test on this system") +class SelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.Selector + + +class SelectSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = select.SelectSelector + + +@unittest.skipUnless(hasattr(select, 'PollSelector'), "Test needs select.poll()") +class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorTestCase): + + SELECTOR = getattr(select, 'PollSelector', None) + + +@unittest.skipUnless(hasattr(select, 'EpollSelector'), "Test needs select.epoll()") +class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorTestCase): + + SELECTOR = getattr(select, 'EpollSelector', None) + + +@unittest.skipUnless(hasattr(select, 'KqueueSelector'), "Test needs select.kqueue()") +class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorTestCase): + + SELECTOR = getattr(select, 'KqueueSelector', None) + + def test_main(): - support.run_unittest(SelectTestCase) + tests = [SelectTestCase] + tests.extend([BasicSelectorTestCase, SelectorTestCase, + SelectSelectorTestCase, PollSelectorTestCase, + EpollSelectorTestCase, KqueueSelectorTestCase]) + support.run_unittest(*tests) support.reap_children() + if __name__ == "__main__": test_main() diff --git a/Modules/selectmodule.c b/Modules/selectmodule.c --- a/Modules/selectmodule.c +++ b/Modules/selectmodule.c @@ -2129,7 +2129,7 @@ static struct PyModuleDef selectmodule = { PyModuleDef_HEAD_INIT, - "select", + "_select", module_doc, -1, select_methods, @@ -2143,7 +2143,7 @@ PyMODINIT_FUNC -PyInit_select(void) +PyInit__select(void) { PyObject *m; m = PyModule_Create(&selectmodule); diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -623,7 +623,7 @@ missing.append('spwd') # select(2); not on ancient System V - exts.append( Extension('select', ['selectmodule.c']) ) + exts.append( Extension('_select', ['selectmodule.c']) ) # Fred Drake's interface to the Python parser exts.append( Extension('parser', ['parsermodule.c']) )