from collections import deque from greenlet import getcurrent as greenlet_getcurrent from greenlet import greenlet as greenlet_greenlet import errno import heapq import os import select import sys import time hub = None def get_hub(): global hub if hub is None: hub = Hub() return hub NOT_USED = _MISSING = object() class Timeout(BaseException): def __init__(self, seconds=None, exception=None): self.seconds = seconds self.exception = exception self.timer = None self.start() def start(self): if self.seconds is None: # "fake" timeout (never expires) self.timer = None elif self.exception is None or isinstance(self.exception, bool): # timeout that raises self self.timer = get_hub().schedule_call_global( self.seconds, greenlet_getcurrent().throw, self) else: # regular timeout with user-provided exception self.timer = get_hub().schedule_call_global( self.seconds, greenlet_getcurrent().throw, self.exception) return self @property def pending(self): """True if the timeout is scheduled to be raised.""" if self.timer is not None: return self.timer.pending else: return False def cancel(self): if self.timer is not None: self.timer.cancel() self.timer = None def __repr__(self): classname = self.__class__.__name__ if self.pending: pending = ' pending' else: pending = '' if self.exception is None: exception = '' else: exception = ' exception=%r' % self.exception return '<%s at %s seconds=%s%s%s>' % ( classname, hex(id(self)), self.seconds, exception, pending) def __enter__(self): if self.timer is None: self.start() return self def __exit__(self, typ, value, tb): self.cancel() if value is self and self.exception is False: return True def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=Timeout, mark_as_closed=None): t = None hub = get_hub() current = greenlet_getcurrent() try: fileno = fd.fileno() except AttributeError: fileno = fd if timeout is not None: def _timeout(exc): # This is only useful to insert debugging current.throw(exc) t = hub.schedule_call_global(timeout, _timeout, timeout_exc) try: if read: listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed) elif write: listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed) try: return hub.switch() finally: hub.remove(listener) finally: if t is not None: t.cancel() def notify_close(fd): hub = get_hub() hub.notify_close(fd) def notify_opened(fd): hub = get_hub() hub.mark_as_reopened(fd) class IOClosed(IOError): pass class Timer(object): def __init__(self, seconds, cb, *args, **kw): self.seconds = seconds self.tpl = cb, args, kw self.called = False @property def pending(self): return not self.called def schedule(self): self.called = False self.scheduled_time = get_hub().add_timer(self) return self def __call__(self, *args): if not self.called: self.called = True cb, args, kw = self.tpl try: cb(*args, **kw) finally: try: del self.tpl except AttributeError: pass def cancel(self): if not self.called: self.called = True get_hub().timer_canceled(self) try: del self.tpl except AttributeError: pass def __lt__(self, other): return id(self) < id(other) READ = "read" WRITE = "write" def closed_callback(fileno): pass class FdListener(object): def __init__(self, evtype, fileno, cb, tb, mark_as_closed): self.evtype = evtype self.fileno = fileno self.cb = cb self.tb = tb self.mark_as_closed = mark_as_closed self.spent = False self.greenlet = greenlet_getcurrent() def defang(self): self.cb = closed_callback if self.mark_as_closed is not None: self.mark_as_closed() self.spent = True noop = FdListener(READ, 0, lambda x: None, lambda x: None, None) class BaseHub(object): SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) READ = READ WRITE = WRITE def __init__(self, clock=None): self.listeners = {READ: {}, WRITE: {}} self.secondaries = {READ: {}, WRITE: {}} self.closed = [] if clock is None: clock = time.monotonic self.clock = clock self.greenlet = greenlet_greenlet(self.run) self.stopping = False self.running = False self.timers = [] self.next_timers = [] self.lclass = FdListener self.timers_canceled = 0 self.debug_exceptions = True self.debug_blocking = False self.debug_blocking_resolution = 1 def add(self, evtype, fileno, cb, tb, mark_as_closed): listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed) bucket = self.listeners[evtype] bucket[fileno] = listener return listener def _obsolete(self, fileno): found = False for evtype, bucket in self.secondaries.items(): if fileno in bucket: for listener in bucket[fileno]: found = True self.closed.append(listener) listener.defang() del bucket[fileno] for evtype, bucket in self.listeners.items(): if fileno in bucket: listener = bucket[fileno] found = True self.closed.append(listener) self.remove(listener) listener.defang() return found def notify_close(self, fileno): pass def remove(self, listener): if listener.spent: return fileno = listener.fileno evtype = listener.evtype self.listeners[evtype].pop(fileno, None) if fileno in self.secondaries[evtype]: sec = self.secondaries[evtype].get(fileno, None) if not sec: return self.listeners[evtype][fileno] = sec.pop(0) if not sec: del self.secondaries[evtype][fileno] def mark_as_reopened(self, fileno): self._obsolete(fileno) def remove_descriptor(self, fileno): listeners = [] listeners.append(self.listeners[READ].pop(fileno, noop)) listeners.append(self.listeners[WRITE].pop(fileno, noop)) listeners.extend(self.secondaries[READ].pop(fileno, ())) listeners.extend(self.secondaries[WRITE].pop(fileno, ())) for listener in listeners: listener.cb(fileno) def close_one(self): listener = self.closed.pop() if not listener.greenlet.dead: listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file")) def ensure_greenlet(self): if self.greenlet.dead: new = greenlet.greenlet(self.run, self.greenlet.parent) self.greenlet.parent = new self.greenlet = new def switch(self): cur = greenlet_getcurrent() switch_out = getattr(cur, 'switch_out', None) if switch_out is not None: switch_out() self.ensure_greenlet() try: if self.greenlet.parent is not cur: cur.parent = self.greenlet except ValueError: pass return self.greenlet.switch() def wait(self, seconds=None): raise NotImplementedError("Implement this in a subclass") def default_sleep(self): return 60.0 def sleep_until(self): t = self.timers if not t: return None return t[0][0] def run(self, *a, **kw): if self.running: raise RuntimeError("Already running!") try: self.running = True self.stopping = False while not self.stopping: while self.closed: self.close_one() self.prepare_timers() self.fire_timers(self.clock()) self.prepare_timers() wakeup_when = self.sleep_until() if wakeup_when is None: sleep_time = self.default_sleep() else: sleep_time = wakeup_when - self.clock() if sleep_time > 0: self.wait(sleep_time) else: self.wait(0) else: self.timers_canceled = 0 del self.timers[:] del self.next_timers[:] finally: self.running = False self.stopping = False def abort(self, wait=False): if self.running: self.stopping = True if wait: self.schedule_call_global(0, lambda: None) self.switch() def add_timer(self, timer): scheduled_time = self.clock() + timer.seconds self.next_timers.append((scheduled_time, timer)) return scheduled_time def prepare_timers(self): heappush = heapq.heappush t = self.timers for item in self.next_timers: if item[1].called: self.timers_canceled -= 1 else: heappush(t, item) del self.next_timers[:] def schedule_call_local(self, seconds, cb, *args, **kw): t = timer.LocalTimer(seconds, cb, *args, **kw) self.add_timer(t) return t def schedule_call_global(self, seconds, cb, *args, **kw): t = Timer(seconds, cb, *args, **kw) self.add_timer(t) return t def fire_timers(self, when): t = self.timers heappop = heapq.heappop while t: next = t[0] exp = next[0] timer = next[1] if when < exp: break heappop(t) if timer.called: self.timers_canceled -= 1 else: timer() class Hub(BaseHub): def _remove_bad_fds(self): all_fds = list(self.listeners[READ]) + list(self.listeners[WRITE]) for fd in all_fds: try: select.select([fd], [], [], 0) except select.error as e: if e.errno == errno.EBADF: self.remove_descriptor(fd) def wait(self, seconds=None): readers = self.listeners[READ] writers = self.listeners[WRITE] if not readers and not writers: if seconds: time.sleep(seconds) return all_fds = list(readers) + list(writers) try: r, w, er = select.select(readers.keys(), writers.keys(), all_fds, seconds) except select.error as e: if e.errno == errno.EINTR: return elif e.errno == errno.EBADF: self._remove_bad_fds() return else: raise for fileno in er: readers.get(fileno, noop).cb(fileno) writers.get(fileno, noop).cb(fileno) for listeners, events in ((readers, r), (writers, w)): for fileno in events: try: listeners.get(fileno, noop).cb(fileno) except self.SYSTEM_EXCEPTIONS: raise class Event(object): _result = None _exc = None def __init__(self): self._waiters = set() self._result = NOT_USED self._exc = None def wait(self, timeout=None): current = greenlet_getcurrent() if self._result is NOT_USED: hub = get_hub() self._waiters.add(current) timer = None if timeout is not None: timer = hub.schedule_call_local(timeout, self._do_send, None, None, current) try: result = hub.switch() if timer is not None: timer.cancel() return result finally: self._waiters.discard(current) if self._exc is not None: current.throw(*self._exc) return self._result def send(self, result=None, exc=None): self._result = result if exc is not None and not isinstance(exc, tuple): exc = (exc, ) self._exc = exc hub = get_hub() for waiter in self._waiters: hub.schedule_call_global( 0, self._do_send, self._result, self._exc, waiter) def _do_send(self, result, exc, waiter): if waiter in self._waiters: if exc is None: waiter.switch(result) else: waiter.throw(*exc) class GreenThread(greenlet_greenlet): def __init__(self, parent): greenlet_greenlet.__init__(self, self.main, parent) self._exit_event = Event() self._resolving_links = False def wait(self): return self._exit_event.wait() def main(self, function, args, kwargs): try: result = function(*args, **kwargs) except: self._exit_event.send_exception(*sys.exc_info()) self._resolve_links() raise else: self._exit_event.send(result) self._resolve_links() def _resolve_links(self): if self._resolving_links: return self._resolving_links = True try: exit_funcs = getattr(self, '_exit_funcs', deque()) while exit_funcs: f, ca, ckw = exit_funcs.popleft() f(self, *ca, **ckw) finally: self._resolving_links = False def kill(self, *throw_args): return kill(self, *throw_args) def cancel(self, *throw_args): return cancel(self, *throw_args) def eventlet_sleep(seconds=0): hub = get_hub() current = greenlet_getcurrent() timer = hub.schedule_call_global(seconds, current.switch) try: hub.switch() finally: timer.cancel() def greenthread_spawn(func, *args, **kwargs): hub = get_hub() g = GreenThread(hub.greenlet) hub.schedule_call_global(0, g.switch, func, args, kwargs) return g class Collision(Exception): pass class PropagateError(Exception): def __init__(self, key, exc): msg = "PropagateError({0}): {1}: {2}" \ .format(key, exc.__class__.__name__, exc) super(PropagateError, self).__init__(msg) self.msg = msg self.args = (key, exc) self.key = key self.exc = exc def __str__(self): return self.msg class DAGPool(object): class _Coro: def __init__(self, greenthread, pending): self.greenthread = greenthread self.pending = pending def __init__(self, preload={}): try: iteritems = preload.items() except AttributeError: iteritems = preload self.values = dict(iteritems) self.coros = {} self.event = Event() def waitall(self): return self.wait() def wait(self, keys=_MISSING): return dict(self.wait_each(keys)) def wait_each(self, keys=_MISSING): return self._wait_each(self._get_keyset_for_wait_each(keys)) def _get_keyset_for_wait_each(self, keys): if keys is not _MISSING: return set(keys) else: return set(self.coros.keys()) | set(self.values.keys()) def _wait_each(self, pending): for key, value in self._wait_each_raw(pending): yield key, self._value_or_raise(value) @staticmethod def _value_or_raise(value): if isinstance(value, PropagateError): raise value return value def _wait_each_raw(self, pending): while True: for key in pending.copy(): value = self.values.get(key, _MISSING) if value is not _MISSING: pending.remove(key) yield (key, value) if not pending: break self.event.wait() def spawn(self, key, depends, function, *args, **kwds): if key in self.coros or key in self.values: raise Collision(key) pending = set(depends) newcoro = greenthread_spawn(self._wrapper, function, key, self._wait_each(pending), *args, **kwds) self.coros[key] = self._Coro(newcoro, pending) def _wrapper(self, function, key, results, *args, **kwds): try: result = function(key, results, *args, **kwds) except Exception as err: result = PropagateError(key, err) finally: del self.coros[key] try: self.post(key, result) except Collision: pass return result def spawn_many(self, depends, function, *args, **kwds): for key, deps in depends.items(): self.spawn(key, deps, function, *args, **kwds) def post(self, key, value, replace=False): coro = self.coros.get(key, _MISSING) if coro is not _MISSING and coro.greenthread is not greenlet_getcurrent(): raise Collision(key) if key in self.values and not replace: raise Collision(key) self.values[key] = value self.event.send() self.event = Event() def get(self, key, default=None): return self._value_or_raise(self.values.get(key, default)) def keys(self): return tuple(self.values.keys()) def items(self): return tuple((key, self._value_or_raise(value)) for key, value in self.values.items()) def running(self): return len(self.coros) def running_keys(self): return tuple(self.coros.keys()) def waiting(self): return len(self.waiting_for()) def waiting_for(self, key=_MISSING): available = set(self.values.keys()) if key is not _MISSING: coro = self.coros.get(key, _MISSING) if coro is _MISSING: self.values[key] return set() else: return coro.pending - available return dict((key, pending) for key, pending in ((key, (coro.pending - available)) for key, coro in self.coros.items()) if pending) class Capture(object): def __init__(self): self.sequence = [set()] def add(self, message): self.sequence[-1].add(message) def step(self): self.sequence.append(set()) def validate(self, sequence): setlist = [] for subseq in sequence: if isinstance(subseq, str): setlist.append(set([subseq])) else: try: iter(subseq) except TypeError: setlist.append(set([subseq])) else: setlist.append(set(subseq)) if self.sequence != setlist: raise Exception("!=") def observe(key, results, capture, event): for k, v in results: capture.add("{0} got {1}".format(key, k)) result = event.wait() capture.add("{0} returning {1}".format(key, result)) return result def post_each(pool, capture): eventlet_sleep(0) capture.step() pool.post('g', 'gval') pool.post('f', 'fval') eventlet_sleep(0) capture.step() pool.post('e', 'eval') pool.post('d', 'dval') def spin(): for x in range(10): eventlet_sleep(0) def spawn_many_func(key, results, capture, pool): for k, v in results: pass capture.add("{0} done".format(key)) pool.post(key, key) capture.step() spin() def waitall_done(capture, pool): pool.waitall() capture.add("waitall() done") def test_spawn_many(): deps = dict(e="cd", d="bc", c="a", b="a", a="") capture = Capture() pool = DAGPool() greenthread_spawn(waitall_done, capture, pool) pool.spawn_many(deps, spawn_many_func, capture, pool) spin() if pool.get("e") != "e": raise Exception("!=") sequence = capture.sequence[:] sequence[1:3] = [set([sequence[1].pop(), sequence[2].pop()])] expected = [set(["a done"]), set(["b done", "c done"]), set(["d done"]), set(["e done"]), set(["waitall() done"]), ] if sequence != expected: raise Exception("!=") if __name__ == "__main__": test_spawn_many()