import types, dis def generate_valid_lnopairs(addr, line): """generates the byte code and line number offsets used in code objects this implements the weird rules for dealing with offsets higher than will fit in the bytes and negative this throws a NotImplementedError for line values less than -128 since I am not sure what is suppose to happen in that case for invalid values like a negative address offset will give an error when passing this value to the bytes constructor.""" while addr > 255: yield 255 yield 0 addr -= 255 while line > 127: yield addr yield 127 addr = 0 line -= 127 while line < -128: yield addr # -128 stored in a byte ends up as +128 when read as unsigned byte. yield 128 addr = 0 line += 128 if line == 0 and addr == 0: return # loops ended up with remainder at 0, so don't do unconditional yield. if line < 0: # storing negative values as 0x100 + (negative value) line += 0x100 yield addr yield line def generate_new_lnotab(func): """generates lnotab to fix the line numbers for with statement cleanups""" orig_lnotab = func.__code__.co_lnotab bytecode = dis.Bytecode(func.__code__) last_line = func.__code__.co_firstlineno last_offset = 0 enter_with = [] for inst in bytecode: if inst.starts_line is not None: yield from generate_valid_lnopairs(inst.offset - last_offset, inst.starts_line - last_line) last_offset = inst.offset last_line = inst.starts_line if inst.opname in ("SETUP_WITH", "BEFORE_ASYNC_WITH"): enter_with.append(last_line) elif inst.opname == "WITH_CLEANUP_START": # need to walk back to line of enter_with yield from generate_valid_lnopairs(inst.offset - last_offset, (enter_with[-1] - last_line)) last_offset = inst.offset last_line = enter_with.pop(-1) def correct_with_cleanup_traceback(func): """given a function, corrects the line numbers for the with statement cleanup so errors in exits will point the with statement""" c = func.__code__ func.__code__ = types.CodeType(c.co_argcount, c.co_kwonlyargcount, c.co_nlocals, c.co_stacksize, c.co_flags, c.co_code, c.co_consts, c.co_names, c.co_varnames, c.co_filename, c.co_name, c.co_firstlineno, bytes(generate_new_lnotab(func)), c.co_freevars, c.co_cellvars) return func class Test(): "example context manager for testing errors in exit method." def __init__(self, fail=False): self.fail = fail def __enter__(self): print("aenter used") def __exit__(self, *errors): if self.fail: raise Exception("error in exit") def my_test(): with Test(True) as fail_during_handling: with Test(True) as will_fail_first: print("inside with") return "hi" #dis.dis(my_test) # show line numbers before ##correct_with_cleanup_traceback(my_test) ##dis.dis(my_test) # show line numbers after correction ##my_test() import asyncio class Test(): async def __aenter__(self): print("aenter used") value = asyncio.Future() value.set_result(True) return value #FORGOT TO MARK AS async !! def __aexit__(self, *errors): print("aexit used") return None @correct_with_cleanup_traceback async def my_test(): async with Test() as x: print("inside async with, now awaiting on", x) await x dis.dis(my_test) my_test().send(None)