Author carlorosati
Recipients carlorosati, davin, rhettinger, xtreak
Date 2018-08-17.02:18:15
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1534472296.58.0.56676864532.issue34410@psf.upfronthosting.co.za>
In-reply-to
Content
I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue.

The first one I wrote uses a managed list. 

def multiprocessing_tee(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    mylock = m.Lock()
    lists = [m.list() for i in range(n)]
    def gen(local_list):
        for i in itertools.count():
            with mylock:
                if not local_list:         # when the local list is empty
                    newval = next(it)      # fetch a new value and
                    for l in lists:        # load it to all the lists
                        l.append(newval)
            yield local_list.pop(0)
    return tuple(gen(l) for l in lists)

The second two implementations use queues. 

def multiprocessing_tee_q(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
    def gen(myqueue):
        while True:
            with lock: # no one else touches anything
                try:
                    newval = myqueue.get_nowait()
                except Queue.Empty:
                    newval = next(it)
                    for q in queues:
                        q.put(newval)
                    newval = myqueue.get()
            yield newval
    return tuple(gen(q) for q in queues)

class Sentinel(object):
    """used as Queue Sentinel"""

def multiprocessing_tee_q2(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
    def gen(myqueue):
        while True:
            try:
                retval = myqueue.get_nowait()
            except Queue.Empty:
                # what happens if the other process puts last item in my queue before i get lock?
                with lock: # no one else touches anything
                    try:
                        newval = next(it)
                    except StopIteration:
                        newval = Sentinel
                    for q in queues:
                        q.put(newval)
                retval = myqueue.get()
            if retval is Sentinel:
                raise StopIteration
            yield retval
    return tuple(gen(q) for q in queues)

I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations.
History
Date User Action Args
2018-08-17 02:18:16carlorosatisetrecipients: + carlorosati, rhettinger, davin, xtreak
2018-08-17 02:18:16carlorosatisetmessageid: <1534472296.58.0.56676864532.issue34410@psf.upfronthosting.co.za>
2018-08-17 02:18:16carlorosatilinkissue34410 messages
2018-08-17 02:18:15carlorosaticreate