import contextlib as _contextlib import heapq as _heapq import itertools as _itertools import operator as _operator import os as _os import random as _random import pathlib as _pathlib import pickle as _pickle def store_pickles_into(filename, data): with open(filename, "xb") as f: for value in data: _pickle.dump(value, f) def load_pickles_from(filename): with open(filename, "rb") as f: yield from _pickles_from_stream(f) def _pickles_from_stream(file): while True: try: yield _pickle.load(file) except EOFError: break @_contextlib.contextmanager def _multiple_open_then_delete(paths): open_streams = [] try: for p in paths: f = open(p, 'rb') open_streams.append(f) yield open_streams[:] finally: for f in open_streams: try: f.close() except: pass for p in paths: p.unlink() CHUNK_SIZE = 10_000 def sorted_with_disk(iterable, /, *, key=None, reverse=False): uid = random.randbytes(8).hex() basename = f"_pysort_{uid}" folder = _pathlib.Path(basename) folder.mkdir() def _filename_generator(): for i in _itertools.count(0): yield folder / f"slice{i}.dat" namegen = _filename_generator() filenames = [] iterable = iter(iterable) def create_runs(): while True: zipped = zip(range(CHUNK_SIZE), iterable) chunk = list(map(_operator.itemgetter(1), zipped)) if not chunk: return chunk.sort(key=key, reverse=reverse) yield chunk def store(data): name = next(namegen) filenames.append(name) store_pickles_into(name, data) def collapse(names_to_merge): with _multiple_open_then_delete(names_to_merge) as files: data_streams = (_pickles_from_stream(f) for f in files) merger = _heapq.merge(*data_streams) store(merger) for i, chunk in enumerate(create_runs(), start=1): store(chunk) del chunk # Combine 16 chunks of the same size. # Always do it as soon as possible. while not (i & 0b1111): i >>= 4 filenames, to_merge = filenames[:-4], filenames[-4:] collapse(to_merge) temp = filenames filenames = [] collapse(temp) (last_file_name,) = filenames yield from load_pickles_from(last_file_name) last_file_name.unlink() folder.rmdir() if __name__ == "__main__": CHUNK_SIZE = 7 import random arr = list(range(1000)) random.shuffle(arr) print(list(sorted_with_disk(arr)) == sorted(arr))