Parser(multiprocessing.Process): def __init__(self, jobs, results, quit): self.jobs = jobs self.results = results self.quit = quit multiprocessing.Process.__init__(self) def run(self): pending = [] while True: try: self.quit.get_nowait() except queue.Empty: pass else: self.results.cancel_join_thread() break if pending: result = pending.pop() else: try: job = self.jobs.pop() except IndexError: break result = self.parse(*job) try: self.results.put(result, timeout=0.25) except queue.Full: pending.append(result) def parse(self, args): [actual parsing] class CookerParser(object): def __init__(self, args): self.current = 0 self.toparse = XXX self.progress_chunk = int(max(self.toparse / 100, 1)) self.num_processes = min(multiprocessing.cpu_count(), self.toparse) self.haveshutdown = False self.results = self.load_cached() self.processes = [] if self.toparse: self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) self.result_queue = multiprocessing.Queue() def chunkify(lst,n): return [lst[i::n] for i in range(n)] self.jobs = chunkify(list(self.willparse), self.num_processes) for i in range(0, self.num_processes): parser = Parser(self.jobs[i], self.result_queue, self.parser_quit) parser.start() self.processes.append(parser) self.results = itertools.chain(self.results, self.parse_generator()) def shutdown(self, clean=True, force=False): if not self.toparse: return if self.haveshutdown: return self.haveshutdown = True if clean: event = bb.event.ParseCompleted(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total) bb.event.fire(event, self.cfgdata) # Allow data left in the cancel queue to be discarded self.parser_quit.cancel_join_thread() for process in self.processes: self.parser_quit.put(None) # Cleanup the queue before call process.join(), otherwise there might be # deadlocks. while True: try: self.result_queue.get(timeout=0.25) except queue.Empty: break for process in self.processes: if force: process.join(.1) process.terminate() else: process.join() self.parser_quit.close() self.parser_quit.join_thread() def parse_generator(self): while True: if self.parsed >= self.toparse: break try: result = self.result_queue.get(timeout=0.25) except queue.Empty: pass else: value = result[1] if isinstance(value, BaseException): raise value else: yield result def parse_next(self): result = [] parsed = None try: parsed, mc, result = next(self.results) except StopIteration: self.shutdown() return False except bb.BBHandledException as exc: self.shutdown(clean=False) return False [handle result]