Message323632
I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue.
The first one I wrote uses a managed list.
def multiprocessing_tee(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
mylock = m.Lock()
lists = [m.list() for i in range(n)]
def gen(local_list):
for i in itertools.count():
with mylock:
if not local_list: # when the local list is empty
newval = next(it) # fetch a new value and
for l in lists: # load it to all the lists
l.append(newval)
yield local_list.pop(0)
return tuple(gen(l) for l in lists)
The second two implementations use queues.
def multiprocessing_tee_q(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
def gen(myqueue):
while True:
with lock: # no one else touches anything
try:
newval = myqueue.get_nowait()
except Queue.Empty:
newval = next(it)
for q in queues:
q.put(newval)
newval = myqueue.get()
yield newval
return tuple(gen(q) for q in queues)
class Sentinel(object):
"""used as Queue Sentinel"""
def multiprocessing_tee_q2(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block)
def gen(myqueue):
while True:
try:
retval = myqueue.get_nowait()
except Queue.Empty:
# what happens if the other process puts last item in my queue before i get lock?
with lock: # no one else touches anything
try:
newval = next(it)
except StopIteration:
newval = Sentinel
for q in queues:
q.put(newval)
retval = myqueue.get()
if retval is Sentinel:
raise StopIteration
yield retval
return tuple(gen(q) for q in queues)
I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations. |
|
Date |
User |
Action |
Args |
2018-08-17 02:18:16 | carlorosati | set | recipients:
+ carlorosati, rhettinger, davin, xtreak |
2018-08-17 02:18:16 | carlorosati | set | messageid: <1534472296.58.0.56676864532.issue34410@psf.upfronthosting.co.za> |
2018-08-17 02:18:16 | carlorosati | link | issue34410 messages |
2018-08-17 02:18:15 | carlorosati | create | |
|