# HG changeset patch # Parent b9ac3c44a4eb0c25ce36735490a02d76dcda4820 diff --git a/Lib/bdb.py b/Lib/bdb.py --- a/Lib/bdb.py +++ b/Lib/bdb.py @@ -3,12 +3,248 @@ import fnmatch import sys import os +import linecache +import token +import tokenize +import itertools +import types +from bisect import bisect +from operator import attrgetter __all__ = ["BdbQuit", "Bdb", "Breakpoint"] +# A dictionary mapping a filename to a BdbModule instance. +_modules = {} +_fncache = {} + +def canonic(filename): + if filename == "<" + filename[1:-1] + ">": + return filename + canonic = _fncache.get(filename) + if not canonic: + canonic = os.path.abspath(filename) + canonic = os.path.normcase(canonic) + _fncache[filename] = canonic + return canonic + +def code_line_numbers(code): + # Source code line numbers generator (see Objects/lnotab_notes.txt). + valid_lno = lno = code.co_firstlineno + yield valid_lno + for line_inc, byte_offset in itertools.islice(zip(code.co_lnotab, + itertools.chain(code.co_lnotab[1:], [1])), 1, None, 2): + lno += line_inc + if byte_offset == 0: + continue + if lno != valid_lno: + valid_lno = lno + yield valid_lno + +def reiterate(it): + """Iterator wrapper allowing to reiterate on items with send().""" + while True: + item = next(it) + val = (yield item) + # Reiterate while the sent value is true. + while val: + # The return value of send(). + yield item + val = (yield item) + +class BdbError(Exception): + """Generic bdb exception.""" + class BdbQuit(Exception): """Exception to give up completely.""" +class BdbModule: + """A module. + + Instance attributes: + functions_firstlno: a dictionary mapping function names and fully + qualified method names to their first line number. + """ + + def __init__(self, filename): + self.filename = filename + self.functions_firstlno = None + self.source_lines = linecache.getlines(self.filename) + try: + self.code = compile(''.join(self.source_lines), filename, 'exec') + except (SyntaxError, TypeError) as err: + raise BdbError('{}: {}.'.format(self.filename, err)) + + def get_func_lno(self, funcname): + """The first line number of the last defined 'funcname' function.""" + if self.functions_firstlno is None: + self.functions_firstlno = {} + self.parse(reiterate(tokenize.generate_tokens( + iter(self.source_lines).__next__))) + try: + return self.functions_firstlno[funcname] + except KeyError: + raise BdbError('{}: function "{}" not found.'.format( + self.filename, funcname)) + + def get_actual_bp(self, lineno): + """Get the actual breakpoint line number. + + When an exact match cannot be found in the lnotab expansion of the + module code object or one of its subcodes, pick up the next valid + statement line number. + + Return the statement line defined by the tuple (code firstlineno, + statement line number) which is at the shortest distance to line + 'lineno' and greater or equal to 'lineno'. When 'lineno' is the first + line number of a subcode, use its first statement line instead. + """ + + def _distance(code, module_level=False): + """The shortest distance to the next valid statement.""" + subcodes = dict((c.co_firstlineno, c) for c in code.co_consts + if isinstance(c, types.CodeType) and not + c.co_name.startswith('<')) + # Get the shortest distance to the subcode whose first line number + # is the last to be less or equal to lineno. That is, find the + # index of the first subcode whose first_lno is the first to be + # strictly greater than lineno. + subcode_dist = None + subcodes_flnos = sorted(subcodes) + idx = bisect(subcodes_flnos, lineno) + if idx != 0: + flno = subcodes_flnos[idx-1] + subcode_dist = _distance(subcodes[flno]) + + # Check if lineno is a valid statement line number in the current + # code, excluding function or method definition lines. + code_lnos = sorted(code_line_numbers(code)) + # Do not stop at execution of function definitions. + if not module_level and len(code_lnos) > 1: + code_lnos = code_lnos[1:] + if lineno in code_lnos and lineno not in subcodes_flnos: + return 0, (code.co_firstlineno, lineno) + + # Compute the distance to the next valid statement in this code. + idx = bisect(code_lnos, lineno) + if idx == len(code_lnos): + # lineno is greater that all 'code' line numbers. + return subcode_dist + actual_lno = code_lnos[idx] + dist = actual_lno - lineno + if subcode_dist and subcode_dist[0] < dist: + return subcode_dist + if actual_lno not in subcodes_flnos: + return dist, (code.co_firstlineno, actual_lno) + else: + # The actual line number is the line number of the first + # statement of the subcode following lineno (recursively). + return _distance(subcodes[actual_lno]) + + code_dist = _distance(self.code, module_level=True) + if not code_dist: + raise BdbError('{}: line {} is after the last ' + 'valid statement.'.format(self.filename, lineno)) + return code_dist[1] + + def parse(self, tok_generator, cindent=0, clss=None): + func_lno = 0 + indent = 0 + try: + for tokentype, tok, srowcol, _end, _line in tok_generator: + if tokentype == token.DEDENT: + # End of function definition. + if func_lno and srowcol[1] <= indent: + func_lno = 0 + # End of class definition. + if clss and srowcol[1] <= cindent: + return + elif tok == 'def' or tok == 'class': + if func_lno and srowcol[1] <= indent: + func_lno = 0 + if clss and srowcol[1] <= cindent: + tok_generator.send(1) + return + tokentype, name = next(tok_generator)[0:2] + if tokentype != token.NAME: + continue # syntax error + # Nested def or class in a function. + if func_lno: + continue + if clss: + name = '{}.{}'.format(clss, name) + if tok == 'def': + lineno, indent = srowcol + func_lno = lineno + self.functions_firstlno[name] = lineno + else: + self.parse(tok_generator, srowcol[1], name) + except StopIteration: + pass + +class ModuleBreakpoints: + """The breakpoints of a module. + + The 'breakpts' attribute is a dictionary that maps a code firstlineno to a + 'line_bps' dictionary that maps each line number of the code, where one or + more breakpoints are set, to the list of corresponding Breakpoint + instances. + + Note: + A line in 'line_bps' is the actual line of the breakpoint (the line where the + debugger stops), this line may differ from the line attribute of the + Breakpoint instance as set by the user. + """ + + def __init__(self, filename): + if filename not in _modules: + _modules[filename] = BdbModule(filename) + self.bdb_module = _modules[filename] + self.breakpts = {} + + def add_breakpoint(self, bp): + firstlineno, actual_lno = self.bdb_module.get_actual_bp(bp.line) + if firstlineno not in self.breakpts: + self.breakpts[firstlineno] = {} + line_bps = self.breakpts[firstlineno] + if actual_lno not in line_bps: + line_bps[actual_lno] = [] + line_bps[actual_lno].append(bp) + return firstlineno, actual_lno + + def delete_breakpoint(self, bp): + firstlineno, actual_lno = bp.actual_bp + try: + line_bps = self.breakpts[firstlineno] + bplist = line_bps[actual_lno] + bplist.remove(bp) + except (KeyError, ValueError): + assert False, ('Internal error: bpbynumber and breakpts' + ' are inconsistent') + if not bplist: + del line_bps[actual_lno] + if not line_bps: + del self.breakpts[firstlineno] + + def get_breakpoints(self, lineno): + """Return the list of breakpoints set at lineno.""" + try: + firstlineno, actual_lno = self.bdb_module.get_actual_bp(lineno) + except BdbError: + return [] + if firstlineno not in self.breakpts: + return [] + line_bps = self.breakpts[firstlineno] + if actual_lno not in line_bps: + return [] + return [bp for bp in sorted(line_bps[actual_lno], + key=attrgetter('number')) if bp.line == lineno] + + def all_breakpoints(self): + bpts = [] + for line_bps in self.breakpts.values(): + for bplist in line_bps.values(): + bpts.extend(bplist) + return [bp for bp in sorted(bpts, key=attrgetter('number'))] class Bdb: """Generic Python debugger base class. @@ -20,22 +256,15 @@ def __init__(self, skip=None): self.skip = set(skip) if skip else None - self.breaks = {} - self.fncache = {} self.frame_returning = None + # A dictionary mapping a filename to a ModuleBreakpoints instance. + self.breakpoints = {} + # Backward compatibility def canonic(self, filename): - if filename == "<" + filename[1:-1] + ">": - return filename - canonic = self.fncache.get(filename) - if not canonic: - canonic = os.path.abspath(filename) - canonic = os.path.normcase(canonic) - self.fncache[filename] = canonic - return canonic + return canonic(filename) def reset(self): - import linecache linecache.checkcache() self.botframe = None self._set_stopinfo(None, None) @@ -72,7 +301,7 @@ # First call of dispatch since reset() self.botframe = frame.f_back # (CT) Note that this may also be None! return self.trace_dispatch - if not (self.stop_here(frame) or self.break_anywhere(frame)): + if not (self.stop_here(frame) or self.break_at_function(frame)): # No need to trace this function return # None self.user_call(frame, arg) @@ -122,32 +351,35 @@ return False def break_here(self, frame): - filename = self.canonic(frame.f_code.co_filename) - if filename not in self.breaks: + filename = canonic(frame.f_code.co_filename) + if filename not in self.breakpoints: return False - lineno = frame.f_lineno - if lineno not in self.breaks[filename]: - # The line itself has no breakpoint, but maybe the line is the - # first line of a function with breakpoint set by function name. - lineno = frame.f_code.co_firstlineno - if lineno not in self.breaks[filename]: - return False + module_bps = self.breakpoints[filename] + firstlineno = frame.f_code.co_firstlineno + if (firstlineno not in module_bps.breakpts or + frame.f_lineno not in module_bps.breakpts[firstlineno]): + return False - # flag says ok to delete temp. bp - (bp, flag) = effective(filename, lineno, frame) - if bp: - self.currentbp = bp.number - if (flag and bp.temporary): - self.do_clear(str(bp.number)) - return True - else: - return False + # Handle multiple breakpoints on the same line (issue 14789) + self.effective_bp_list = [] + for bp in module_bps.breakpts[firstlineno][frame.f_lineno]: + stop, delete = bp.process_hit_event(frame) + if stop: + self.effective_bp_list.append(bp.number) + if bp.temporary and delete: + self.do_clear(str(bp.number)) + return len(self.effective_bp_list) != 0 def do_clear(self, arg): raise NotImplementedError("subclass of bdb must implement do_clear()") - def break_anywhere(self, frame): - return self.canonic(frame.f_code.co_filename) in self.breaks + def break_at_function(self, frame): + filename = canonic(frame.f_code.co_filename) + if filename not in self.breakpoints: + return False + if frame.f_code.co_firstlineno in self.breakpoints[filename].breakpts: + return True + return False # Derived classes should override the user_* methods # to gain control. @@ -227,7 +459,7 @@ def set_continue(self): # Don't stop except at breakpoints or when finished self._set_stopinfo(self.botframe, None, -1) - if not self.breaks: + if not self.has_breaks(): # no breakpoints; run without debugger overhead sys.settrace(None) frame = sys._getframe().f_back @@ -244,39 +476,29 @@ # Derived classes and clients can call the following methods # to manipulate breakpoints. These methods return an # error message is something went wrong, None if all is well. - # Set_break prints out the breakpoint line and file:lineno. # Call self.get_*break*() to see the breakpoints or better # for bp in Breakpoint.bpbynumber: if bp: bp.bpprint(). - def set_break(self, filename, lineno, temporary=False, cond=None, + def set_break(self, fname, lineno, temporary=False, cond=None, funcname=None): - filename = self.canonic(filename) - import linecache # Import as late as possible - line = linecache.getline(filename, lineno) - if not line: - return 'Line %s:%d does not exist' % (filename, lineno) - list = self.breaks.setdefault(filename, []) - if lineno not in list: - list.append(lineno) - bp = Breakpoint(filename, lineno, temporary, cond, funcname) - - def _prune_breaks(self, filename, lineno): - if (filename, lineno) not in Breakpoint.bplist: - self.breaks[filename].remove(lineno) - if not self.breaks[filename]: - del self.breaks[filename] + filename = canonic(fname) + if filename not in self.breakpoints: + module_bps = ModuleBreakpoints(filename) + else: + module_bps = self.breakpoints[filename] + if funcname: + lineno = module_bps.bdb_module.get_func_lno(funcname) + bp = Breakpoint(filename, lineno, module_bps, temporary, cond) + if filename not in self.breakpoints: + self.breakpoints[filename] = module_bps + return bp def clear_break(self, filename, lineno): - filename = self.canonic(filename) - if filename not in self.breaks: - return 'There are no breakpoints in %s' % filename - if lineno not in self.breaks[filename]: + bplist = self.get_breaks(filename, lineno) + if not bplist: return 'There is no breakpoint at %s:%d' % (filename, lineno) - # If there's only one bp in the list for that file,line - # pair, then remove the breaks entry - for bp in Breakpoint.bplist[filename, lineno][:]: + for bp in bplist: bp.deleteMe() - self._prune_breaks(filename, lineno) def clear_bpbynumber(self, arg): try: @@ -284,25 +506,21 @@ except ValueError as err: return str(err) bp.deleteMe() - self._prune_breaks(bp.file, bp.line) - def clear_all_file_breaks(self, filename): - filename = self.canonic(filename) - if filename not in self.breaks: - return 'There are no breakpoints in %s' % filename - for line in self.breaks[filename]: - blist = Breakpoint.bplist[filename, line] - for bp in blist: + def clear_all_file_breaks(self, fname): + filename = canonic(fname) + if (filename not in self.breakpoints or not + self.breakpoints[filename].breakpts.keys()): + return 'There are no breakpoints in %s' % fname + for bp in self.breakpoints[filename].all_breakpoints(): bp.deleteMe() - del self.breaks[filename] def clear_all_breaks(self): - if not self.breaks: + if not self.has_breaks(): return 'There are no breakpoints' for bp in Breakpoint.bpbynumber: if bp: bp.deleteMe() - self.breaks = {} def get_bpbynumber(self, arg): if not arg: @@ -320,25 +538,31 @@ return bp def get_break(self, filename, lineno): - filename = self.canonic(filename) - return filename in self.breaks and \ - lineno in self.breaks[filename] + return len(self.get_breaks(filename, lineno)) != 0 def get_breaks(self, filename, lineno): - filename = self.canonic(filename) - return filename in self.breaks and \ - lineno in self.breaks[filename] and \ - Breakpoint.bplist[filename, lineno] or [] + filename = canonic(filename) + if filename in self.breakpoints: + return self.breakpoints[filename].get_breakpoints(lineno) + return [] def get_file_breaks(self, filename): - filename = self.canonic(filename) - if filename in self.breaks: - return self.breaks[filename] - else: + filename = canonic(filename) + if filename not in self.breakpoints: return [] + return [bp.line for bp in self.breakpoints[filename].all_breakpoints()] def get_all_breaks(self): - return self.breaks + breaks = {} + for filename in self.breakpoints: + linebp_list = self.get_file_breaks(filename) + if linebp_list: + breaks[filename] = self.get_file_breaks(filename) + return breaks + + def has_breaks(self): + return any(self.breakpoints[f].breakpts.keys() + for f in self.breakpoints) # Derived classes and clients can call the following method # to get a data structure representing a stack trace. @@ -362,9 +586,9 @@ return stack, i def format_stack_entry(self, frame_lineno, lprefix=': '): - import linecache, reprlib + import reprlib frame, lineno = frame_lineno - filename = self.canonic(frame.f_code.co_filename) + filename = canonic(frame.f_code.co_filename) s = '%s(%r)' % (filename, lineno) if frame.f_code.co_name: s += frame.f_code.co_name @@ -455,29 +679,19 @@ Implements temporary breakpoints, ignore counts, disabling and (re)-enabling, and conditionals. - Breakpoints are indexed by number through bpbynumber and by - the file,line tuple using bplist. The former points to a - single instance of class Breakpoint. The latter points to a - list of such instances since there may be more than one - breakpoint per line. + Breakpoints are indexed by number through bpbynumber. """ - # XXX Keeping state in the class is a mistake -- this means - # you cannot have more than one active Bdb instance. + next = 1 # Next bp to be assigned + bpbynumber = [None] # Each entry is None or an instance of Bpt - next = 1 # Next bp to be assigned - bplist = {} # indexed by (file, lineno) tuple - bpbynumber = [None] # Each entry is None or an instance of Bpt - # index 0 is unused, except for marking an - # effective break .... see effective() - - def __init__(self, file, line, temporary=False, cond=None, funcname=None): - self.funcname = funcname - # Needed if funcname is not None. - self.func_first_executable_line = None + def __init__(self, file, line, module, temporary=False, + cond=None): self.file = file # This better be in canonical form! self.line = line + self.module = module + self.actual_bp = module.add_breakpoint(self) self.temporary = temporary self.cond = cond self.enabled = True @@ -485,20 +699,12 @@ self.hits = 0 self.number = Breakpoint.next Breakpoint.next += 1 - # Build the two lists self.bpbynumber.append(self) - if (file, line) in self.bplist: - self.bplist[file, line].append(self) - else: - self.bplist[file, line] = [self] def deleteMe(self): - index = (self.file, self.line) - self.bpbynumber[self.number] = None # No longer in list - self.bplist[index].remove(self) - if not self.bplist[index]: - # No more bp for this f:l combo - del self.bplist[index] + if self.bpbynumber[self.number]: + self.bpbynumber[self.number] = None # No longer in list + self.module.delete_breakpoint(self) def enable(self): self.enabled = True @@ -506,6 +712,27 @@ def disable(self): self.enabled = False + def process_hit_event(self, frame): + """Return (stop_state, delete_temporary) at a breakpoint hit event.""" + if not self.enabled: + return False, False + # Count every hit when breakpoint is enabled. + self.hits += 1 + # A conditional breakpoint. + if self.cond: + try: + if not eval(self.cond, frame.f_globals, frame.f_locals): + return False, False + except: + # If the breakpoint condition evaluation fails, the most + # conservative thing is to stop on the breakpoint. Don't + # delete temporary, as another hint to the user. + return True, False + if self.ignore > 0: + self.ignore -= 1 + return False, False + return True, True + def bpprint(self, out=None): if out is None: out = sys.stdout @@ -537,81 +764,6 @@ def __str__(self): return 'breakpoint %s at %s:%s' % (self.number, self.file, self.line) -# -----------end of Breakpoint class---------- - -def checkfuncname(b, frame): - """Check whether we should break here because of `b.funcname`.""" - if not b.funcname: - # Breakpoint was set via line number. - if b.line != frame.f_lineno: - # Breakpoint was set at a line with a def statement and the function - # defined is called: don't break. - return False - return True - - # Breakpoint set via function name. - - if frame.f_code.co_name != b.funcname: - # It's not a function call, but rather execution of def statement. - return False - - # We are in the right frame. - if not b.func_first_executable_line: - # The function is entered for the 1st time. - b.func_first_executable_line = frame.f_lineno - - if b.func_first_executable_line != frame.f_lineno: - # But we are not at the first line number: don't break. - return False - return True - -# Determines if there is an effective (active) breakpoint at this -# line of code. Returns breakpoint number or 0 if none -def effective(file, line, frame): - """Determine which breakpoint for this file:line is to be acted upon. - - Called only if we know there is a bpt at this - location. Returns breakpoint that was triggered and a flag - that indicates if it is ok to delete a temporary bp. - - """ - possibles = Breakpoint.bplist[file, line] - for b in possibles: - if not b.enabled: - continue - if not checkfuncname(b, frame): - continue - # Count every hit when bp is enabled - b.hits += 1 - if not b.cond: - # If unconditional, and ignoring go on to next, else break - if b.ignore > 0: - b.ignore -= 1 - continue - else: - # breakpoint and marker that it's ok to delete if temporary - return (b, True) - else: - # Conditional bp. - # Ignore count applies only to those bpt hits where the - # condition evaluates to true. - try: - val = eval(b.cond, frame.f_globals, frame.f_locals) - if val: - if b.ignore > 0: - b.ignore -= 1 - # continue - else: - return (b, True) - # else: - # continue - except: - # if eval fails, most conservative thing is to stop on - # breakpoint regardless of ignore count. Don't delete - # temporary, as another hint to user. - return (b, False) - return (None, None) - # -------------------- testing -------------------- @@ -621,10 +773,9 @@ if not name: name = '???' print('+++ call', name, args) def user_line(self, frame): - import linecache name = frame.f_code.co_name if not name: name = '???' - fn = self.canonic(frame.f_code.co_filename) + fn = canonic(frame.f_code.co_filename) line = linecache.getline(fn, frame.f_lineno, frame.f_globals) print('+++', fn, frame.f_lineno, name, ':', line.strip()) def user_return(self, frame, retval): diff --git a/Lib/pdb.py b/Lib/pdb.py --- a/Lib/pdb.py +++ b/Lib/pdb.py @@ -77,6 +77,7 @@ import pprint import signal import inspect +import importlib import traceback import linecache @@ -125,6 +126,60 @@ return lineno return 0 +def get_module_fname(module_name, path=None, inpackage=None): + if module_name in sys.modules: + return getattr(sys.modules[module_name], '__file__', None) + + if inpackage is not None: + fullmodule = '{}.{}'.format(inpackage, module_name) + else: + fullmodule = module_name + + i = module_name.rfind('.') + if i >= 0: + package = module_name[:i] + submodule = module_name[i+1:] + parent = get_module_fname(package, path, inpackage) + if parent is None: + return None + if inpackage is not None: + package = '{}.{}'.format(inpackage, package) + return get_module_fname(submodule, [os.path.dirname(parent)], package) + + if inpackage is not None: + search_path = path + else: + search_path = sys.path + try: + loader = importlib.find_loader(fullmodule, search_path) + return loader.get_filename(fullmodule) + except (AttributeError, ImportError): + return None + +def get_fqn_fname(fqn, frame): + try: + func = eval(fqn, frame.f_globals) + except: + # fqn is defined in a module not yet (fully) imported. + module = inspect.getmodule(frame) + candidate_tuples = [] + frame_fname = get_module_fname(module.__name__) + # Try first the current module for a function or method. + if frame_fname: + candidate_tuples.append((fqn, frame_fname)) + names = fqn.split('.') + for i in range(len(names) - 1, 0, -1): + filename = get_module_fname('.'.join(names[:i])) + if filename: + candidate_tuples.append(('.'.join(names[i:]), filename)) + return candidate_tuples + else: + try: + filename = inspect.getfile(func) + except TypeError: + return [] + return [(fqn, filename)] + class _rstr(str): """String that doesn't quote its repr.""" @@ -258,7 +313,7 @@ def user_line(self, frame): """This function is called when we stop or break at this line.""" if self._wait_for_mainpyfile: - if (self.mainpyfile != self.canonic(frame.f_code.co_filename) + if (self.mainpyfile != bdb.canonic(frame.f_code.co_filename) or frame.f_lineno <= 0): return self._wait_for_mainpyfile = False @@ -266,27 +321,38 @@ self.interaction(frame, None) def bp_commands(self, frame): - """Call every command that was set for the current active breakpoint - (if there is one). + """Call every command that was set for the current active breakpoints. Returns True if the normal interaction function must be called, False otherwise.""" - # self.currentbp is set in bdb in Bdb.break_here if a breakpoint was hit - if getattr(self, "currentbp", False) and \ - self.currentbp in self.commands: - currentbp = self.currentbp - self.currentbp = 0 - lastcmd_back = self.lastcmd - self.setup(frame, None) - for line in self.commands[currentbp]: - self.onecmd(line) - self.lastcmd = lastcmd_back - if not self.commands_silent[currentbp]: - self.print_stack_entry(self.stack[self.curindex]) - if self.commands_doprompt[currentbp]: - self._cmdloop() - self.forget() - return + # Handle multiple breakpoints on the same line (issue 14789) + # self.effective_bp_list is set in bdb in Bdb.break_here if breakpoints + # were hit. + if hasattr(self, 'effective_bp_list') and self.effective_bp_list: + silent = True + doprompt = False + atleast_one_cmd = False + for bp in self.effective_bp_list: + if bp in self.commands: + atleast_one_cmd = True + lastcmd_back = self.lastcmd + self.setup(frame, None) + for line in self.commands[bp]: + self.onecmd(line) + self.lastcmd = lastcmd_back + if not self.commands_silent[bp]: + silent = False + if self.commands_doprompt[bp]: + doprompt = True + + self.effective_bp_list = [] + if atleast_one_cmd: + if not silent: + self.print_stack_entry(self.stack[self.curindex]) + if doprompt: + self._cmdloop() + self.forget() + return return 1 def user_return(self, frame, return_value): @@ -602,81 +668,67 @@ sys.path; the .py suffix may be omitted. """ if not arg: - if self.breaks: # There's at least one + all_breaks = '\n'.join(bp.bpformat() for bp in + bdb.Breakpoint.bpbynumber if bp) + if all_breaks: self.message("Num Type Disp Enb Where") - for bp in bdb.Breakpoint.bpbynumber: - if bp: - self.message(bp.bpformat()) + self.message(all_breaks) return - # parse arguments; comma has lowest precedence - # and cannot occur in filename - filename = None - lineno = None - cond = None - comma = arg.find(',') - if comma > 0: - # parse stuff after comma: "condition" - cond = arg[comma+1:].lstrip() - arg = arg[:comma].rstrip() - # parse stuff before comma: [filename:]lineno | function - colon = arg.rfind(':') - funcname = None - if colon >= 0: - filename = arg[:colon].rstrip() - f = self.lookupmodule(filename) - if not f: - self.error('%r not found from sys.path' % filename) - return + + # Parse arguments, comma has lowest precedence and cannot occur in + # filename. + args = arg.rsplit(',', 1) + cond = args[1].strip() if len(args) == 2 else None + # Parse stuff before comma: [filename:]lineno | function. + args = args[0].rsplit(':', 1) + name = args[0].strip() + lineno = args[1] if len(args) == 2 else args[0] + try: + lineno = int(lineno) + except ValueError: + if len(args) == 2: + self.error('Bad lineno: "{}".'.format(lineno)) else: - filename = f - arg = arg[colon+1:].lstrip() + # Attempt the list of possible function or method fully + # qualified names and corresponding filenames. + candidates = get_fqn_fname(name, self.curframe) + for fqn, fname in candidates: + try: + bp = self.set_break(fname, None, temporary, cond, fqn) + self.message('Breakpoint {:d} at {}:{:d}'.format( + bp.number, bp.file, bp.line)) + return + except bdb.BdbError: + pass + if not candidates: + self.error( + 'Not a function or a built-in: "{}"'.format(name)) + else: + self.error('Bad name: "{}".'.format(name)) + else: + filename = self.curframe.f_code.co_filename + if len(args) == 2 and name: + filename = name + if filename.startswith('<') and filename.endswith('>'): + # allow : doctest installs a hook at + # linecache.getlines to allow to be + # linecached and readable. + if filename == '' and self.mainpyfile: + filename = self.mainpyfile + else: + root, ext = os.path.splitext(filename) + if ext == '': + filename = filename + '.py' + if not os.path.exists(filename): + self.error('Bad filename: "{}".'.format(arg)) + return try: - lineno = int(arg) - except ValueError: - self.error('Bad lineno: %s' % arg) - return - else: - # no colon; can be lineno or function - try: - lineno = int(arg) - except ValueError: - try: - func = eval(arg, - self.curframe.f_globals, - self.curframe_locals) - except: - func = arg - try: - if hasattr(func, '__func__'): - func = func.__func__ - code = func.__code__ - #use co_name to identify the bkpt (function names - #could be aliased, but co_name is invariant) - funcname = code.co_name - lineno = code.co_firstlineno - filename = code.co_filename - except: - # last thing to try - (ok, filename, ln) = self.lineinfo(arg) - if not ok: - self.error('The specified object %r is not a function ' - 'or was not found along sys.path.' % arg) - return - funcname = ok # ok contains a function name - lineno = int(ln) - if not filename: - filename = self.defaultFile() - # Check for reasonable breakpoint - line = self.checkline(filename, lineno) - if line: - # now set the break point - err = self.set_break(filename, line, temporary, cond, funcname) - if err: - self.error(err, file=self.stdout) + bp = self.set_break(filename, lineno, temporary, cond) + except bdb.BdbError as err: + self.error(err) else: - bp = self.get_breaks(filename, line)[-1] - self.message("Breakpoint %d at %s:%d" % - (bp.number, bp.file, bp.line)) + self.message('Breakpoint {:d} at {}:{:d}'.format( + bp.number, bp.file, bp.line)) # To be overridden in derived debuggers def defaultFile(self): @@ -700,60 +752,6 @@ complete_tbreak = _complete_location - def lineinfo(self, identifier): - failed = (None, None, None) - # Input is identifier, may be in single quotes - idstring = identifier.split("'") - if len(idstring) == 1: - # not in single quotes - id = idstring[0].strip() - elif len(idstring) == 3: - # quoted - id = idstring[1].strip() - else: - return failed - if id == '': return failed - parts = id.split('.') - # Protection for derived debuggers - if parts[0] == 'self': - del parts[0] - if len(parts) == 0: - return failed - # Best first guess at file to look at - fname = self.defaultFile() - if len(parts) == 1: - item = parts[0] - else: - # More than one part. - # First is module, second is method/class - f = self.lookupmodule(parts[0]) - if f: - fname = f - item = parts[1] - answer = find_function(item, fname) - return answer or failed - - def checkline(self, filename, lineno): - """Check whether specified line seems to be executable. - - Return `lineno` if it is, 0 if not (e.g. a docstring, comment, blank - line or EOF). Warning: testing is not comprehensive. - """ - # this method should be callable before starting debugging, so default - # to "no globals" if there is no current frame - globs = self.curframe.f_globals if hasattr(self, 'curframe') else None - line = linecache.getline(filename, lineno, globs) - if not line: - self.message('End of file') - return 0 - line = line.strip() - # Don't allow setting breakpoint at a blank line - if (not line or (line[0] == '#') or - (line[:3] == '"""') or line[:3] == "'''"): - self.error('Blank or comment') - return 0 - return lineno - def do_enable(self, arg): """enable bpnumber [bpnumber ...] Enables the breakpoints given as a space separated list of @@ -1213,7 +1211,7 @@ first = self.lineno + 1 if last is None: last = first + 10 - filename = self.curframe.f_code.co_filename + filename = bdb.canonic(self.curframe.f_code.co_filename) breaklist = self.get_file_breaks(filename) try: lines = linecache.getlines(filename, self.curframe.f_globals) @@ -1481,30 +1479,6 @@ # other helper functions - def lookupmodule(self, filename): - """Helper function for break/clear parsing -- may be overridden. - - lookupmodule() translates (possibly incomplete) file or module name - into an absolute file name. - """ - if os.path.isabs(filename) and os.path.exists(filename): - return filename - f = os.path.join(sys.path[0], filename) - if os.path.exists(f) and self.canonic(f) == self.mainpyfile: - return f - root, ext = os.path.splitext(filename) - if ext == '': - filename = filename + '.py' - if os.path.isabs(filename): - return filename - for dirname in sys.path: - while os.path.islink(dirname): - dirname = os.readlink(dirname) - fullname = os.path.join(dirname, filename) - if os.path.exists(fullname): - return fullname - return None - def _runscript(self, filename): # The script has to run in __main__ namespace (or imports from # __main__ will break). @@ -1524,7 +1498,7 @@ # avoid stopping before we reach the main script (see user_line and # user_call for details). self._wait_for_mainpyfile = True - self.mainpyfile = self.canonic(filename) + self.mainpyfile = bdb.canonic(filename) self._user_requested_quit = False with open(filename, "rb") as fp: statement = "exec(compile(%r, %r, 'exec'))" % \ diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -275,6 +275,38 @@ """ +def test_issue_14792(): + """ + >>> def foo(): + ... x = 1 + ... x = 2 + + >>> def test_function(): + ... import pdb; pdb.Pdb(nosigint=True).set_trace() + ... foo() + + >>> with PdbTestInput([ + ... 'step', + ... 'step', + ... 'break foo', + ... 'continue', + ... ]): + ... test_function() + > (3)test_function() + -> foo() + (Pdb) step + --Call-- + > (1)foo() + -> def foo(): + (Pdb) step + > (2)foo() + -> x = 1 + (Pdb) break foo + Breakpoint 1 at :1 + (Pdb) continue + """ + + def do_nothing(): pass @@ -597,11 +629,36 @@ """ +def normalize(result, filename='', strip_bp_lnum=False): + """Normalize a test result.""" + lines = [] + for line in result.splitlines(): + while line.startswith('(Pdb) ') or line.startswith('(com) '): + line = line[6:] + words = line.split() + line = [] + # Replace tabs with spaces + for word in words: + if filename: + idx = word.find(filename) + # Remove the filename prefix + if idx > 0: + word = word[idx:] + if idx >=0 and strip_bp_lnum: + idx = word.find(':') + # Remove the ':' separator and breakpoint line number + if idx > 0: + word = word[:idx] + line.append(word) + line = ' '.join(line) + lines.append(line.strip()) + return '\n'.join(lines) + + class PdbTestCase(unittest.TestCase): - def run_pdb(self, script, commands): + def run_pdb(self, script, commands, filename): """Run 'script' lines with pdb and the pdb 'commands'.""" - filename = 'main.py' with open(filename, 'w') as f: f.write(textwrap.dedent(script)) self.addCleanup(support.unlink, filename) @@ -609,7 +666,7 @@ stdout = stderr = None with subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, - stderr=subprocess.STDOUT, + stderr=subprocess.PIPE, ) as proc: stdout, stderr = proc.communicate(str.encode(commands)) stdout = stdout and bytes.decode(stdout) @@ -662,11 +719,278 @@ with open('bar.py', 'w') as f: f.write(textwrap.dedent(bar)) self.addCleanup(support.unlink, 'bar.py') - stdout, stderr = self.run_pdb(script, commands) + stdout, stderr = self.run_pdb(script, commands, 'main.py') self.assertTrue( any('main.py(5)foo()->None' in l for l in stdout.splitlines()), 'Fail to step into the caller after a return') + def test_breakpoints_set_on_non_statement(self): + script = """ + def foo(): + + # comment + x = 1 + + if not x: + x = 2 + else: + x = 3 + + foo() + """ + commands = """ + break 3 + break 4 + break 9 + continue + break + continue + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:3 + Breakpoint 2 at main.py:4 + Breakpoint 3 at main.py:9 + > main.py(5)foo() + -> x = 1 + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:3 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:4 + breakpoint already hit 1 time + 3 breakpoint keep yes at main.py:9 + > main.py(10)foo() + -> x = 3 + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to stop at breakpoint set at empty line, a comment line or' + ' a non valid statement line.'.format(expected, stdout)) + + def test_issue14789(self): + script = """ + def bar(a): + x = 1 + + bar(10) + bar(20) + """ + commands = """ + break bar + commands 1 + print a + end + ignore 1 1 + break bar + commands 2 + print a + 1 + end + ignore 2 1 + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def bar(a): + Breakpoint 1 at main.py:2 + Will ignore next 1 crossing of breakpoint 1. + Breakpoint 2 at main.py:2 + Will ignore next 1 crossing of breakpoint 2. + 20 + 21 + > main.py(3)bar() + -> x = 1 + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:2 + breakpoint already hit 2 times + 2 breakpoint keep yes at main.py:2 + breakpoint already hit 2 times + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to handle two breakpoints set on the same line.'.format( + expected, stdout)) + + def test_issue14795(self): + script = """ + class C: + def foo(self): + pass + """ + commands = """ + break C.foo + break bar.bar + quit + """ + bar = """ + def bar(): + pass + """ + expected = """ + > main.py(2)() + -> class C: + Breakpoint 1 at main.py:3 + Breakpoint 2 at bar.py:2 + """ + with open('bar.py', 'w') as f: + f.write(textwrap.dedent(bar)) + self.addCleanup(support.unlink, 'bar.py') + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(normalize(stdout, filename), 'bar.py') + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to set a breakpoint in a method, or in a function whose' + ' module is not yet imported.'.format(expected, stdout)) + + def test_issue_14808(self): + script = """ + def foo(): + pass + + def bar(): + pass + + foo() + bar() + """ + commands = """ + break 2 + break bar + continue + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:2 + Breakpoint 2 at main.py:5 + > main.py(3)foo() + -> pass + > main.py(6)bar() + -> pass + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:2 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:5 + breakpoint already hit 1 time + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to stop at breakpoint set at function definition line' + ' or at function name.'.format(expected, stdout)) + + def test_set_breakpoint_by_function_name(self): + script = """ + class C: + c_foo = 1 + + class D: + def d_foo(self): + pass + + def foo(): + pass + + not_a_function = 1 + foo() + """ + commands = """ + break C + break C.c_foo + break D.d_foo + break foo + break not_a_function + break len + break logging.handlers.SocketHandler.close + continue + break C + break C.c_foo + break D.d_foo + break foo + break not_a_function + quit + """ + expected = """ + > main.py(2)() + -> class C: + *** Bad name: "C". + *** Bad name: "C.c_foo". + Breakpoint 1 at main.py:6 + Breakpoint 2 at main.py:9 + *** Bad name: "not_a_function". + *** Not a function or a built-in: "len" + Breakpoint 3 at handlers.py: + > main.py(10)foo() + -> pass + *** Bad name: "C". + *** Not a function or a built-in: "C.c_foo" + Breakpoint 4 at main.py:6 + Breakpoint 5 at main.py:9 + *** Not a function or a built-in: "not_a_function" + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(normalize(stdout, 'handlers.py', strip_bp_lnum=True), filename) + expected = normalize(expected, 'handlers.py', strip_bp_lnum=True) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to handle a breakpoint set by function name.'.format(expected, stdout)) + + def test_breakpoint_set_by_line_and_funcname(self): + script = """ + def foo(): + pass + + foo() + """ + commands = """ + break foo + break 2 + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:2 + Breakpoint 2 at main.py:2 + > main.py(3)foo() + -> pass + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:2 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:2 + breakpoint already hit 1 time + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Failed hits for breakpoints set by line number and by function' + ' name.'.format(expected, stdout)) + def tearDown(self): support.unlink(support.TESTFN)