import threading import os import select import fcntl import time import errno import random POLLER_THREADS = 20 TOGGLER_THREADS = 1 def poller(pollobj, fd): global event p1 = random.random() * 0.001 p2 = int(random.random() * 100) while True: time.sleep(p1) try: events = pollobj.poll(p2) except select.error, e: if e.args[0] != errno.EINTR: raise else: if not events: continue if events[0][0] != fd: if not event: print "poll returned %r, we were looking for fd %d" % (events, fd) event = True break if events[0][1] & ~(select.POLLIN | select.POLLHUP | select.POLLNVAL): print "fd correct but revents might be bogus; poll returned %r" % (events,) break if events[0][1] & select.POLLIN: continue if events[0][1] & (select.POLLHUP | select.POLLNVAL): break print "should not get here; poll returned %r" % (events,) def toggler(r, w): p1 = random.random() * 0.001 while True: time.sleep(p1) try: os.write(w, "!") except OSError, e: if e.errno == errno.EBADF: return if e.errno == errno.EPIPE: return raise try: n = os.read(r, 4096) except OSError, e: if e.errno == errno.EBADF: break if e.errno == errno.EAGAIN: continue raise if __name__ == '__main__': events = 0 for _ in xrange(30): event = False r, w = os.pipe() fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) pollobj = select.poll() pollobj.register(r, select.POLLIN) threads = ( [threading.Thread(target=poller, args=(pollobj, r)) for _ in xrange(POLLER_THREADS)] + [threading.Thread(target=toggler, args=(r, w)) for _ in xrange(TOGGLER_THREADS)] + []) for t in threads: t.setDaemon(1) t.start() time.sleep(1.0) os.close(r) os.close(w) time.sleep(0.1) if event: events += 1 print "%d events in 30 iterations" % (events,)