diff --git a/Lib/bdb.py b/Lib/bdb.py --- a/Lib/bdb.py +++ b/Lib/bdb.py @@ -3,12 +3,447 @@ import fnmatch import sys import os +import inspect +import linecache +import token +import tokenize +import imp +import importlib +import pprint +from operator import itemgetter, attrgetter __all__ = ["BdbQuit", "Bdb", "Breakpoint"] +# A dictionary mapping a filename to a ModuleSource instance. +_modules = {} +_fncache = {} + +def canonic(filename): + if filename == "<" + filename[1:-1] + ">": + return filename + canonic = _fncache.get(filename) + if not canonic: + canonic = os.path.abspath(filename) + canonic = os.path.normcase(canonic) + _fncache[filename] = canonic + return canonic + +def getfilename(module_name, path=None, inpackage=None): + """Return the file name of module_name.""" + if module_name in sys.modules: + return getattr(sys.modules[module_name], '__file__', None) + + if inpackage is not None: + fullmodule = '{}.{}'.format(inpackage, module_name) + else: + fullmodule = module_name + + i = module_name.rfind('.') + if i >= 0: + package = module_name[:i] + submodule = module_name[i+1:] + parent = getfilename(package, path, inpackage) + if parent is None: + return None + if inpackage is not None: + package = '{}.{}'.format(inpackage, package) + return getfilename(submodule, [os.path.dirname(parent)], package) + + if inpackage is not None: + search_path = path + else: + search_path = sys.path + try: + loader = importlib.find_loader(fullmodule, search_path) + return loader.get_filename(fullmodule) + except (AttributeError, ImportError): + return None + +def funcname_breakpoint(funcname, filename=None, frame=None): + """Return the filename and the line of the first statement of funcname. + + Fail with BdbError when filename and line cannot be found. + """ + if not filename and not frame: + raise BdbError('Error at funcname_breakpoint: invalid arguments.') + + if filename: + filename = canonic(filename) + if filename not in _modules: + module_src = ModuleSource(filename) + if module_src.lines: + _modules[filename] = module_src + else: + module_src = _modules[filename] + func_lno = module_src.function_lineno.get(funcname) + lineno = module_src.get_actual_bp(func_lno)[1] + if not lineno: + raise BdbError('Bad function name: "{}".'.format(funcname)) + return filename, lineno + + # frame is not None + try: + func = eval(funcname, frame.f_globals) + except: + # funcname is defined in a module not yet (fully) imported + module = inspect.getmodule(frame) + filename, lineno = FunctionQualifiedName(funcname, + module).get_fileline() + if not filename: + raise BdbError('Bad name: "{}".'.format(funcname)) + return filename, lineno + else: + try: + filename = inspect.getfile(func) + except TypeError: + raise BdbError('Cannot set a breakpoint at "{}"'.format(funcname)) + return funcname_breakpoint(funcname, filename) + + +class BdbError(Exception): + """Generic bdb exception.""" + class BdbQuit(Exception): """Exception to give up completely.""" +# A line_set type: a function definition possibly spanning multiple physical +# lines, a group of consecutive logical lines, a logical line that spans +# multiple physical lines. +LINE_DEF, LINE_GROUP, LINE_MULTI = tuple(range(3)) + +class ModuleSource: + """A parsed module source. + + The parsed source is defined by two attributes: function_lineno, a + dictionary mapping function and method names to the first line of their + definition and lines, a list of line_set tuples (start, end, ltype, + func_lno). Where start is the first line of the line_set, end is the last + line, ltype is its type and func_lno is the first line of the function + definition that the line_set belongs to. + """ + + def __init__(self, filename): + self.filename = filename + self.function_lineno = {} + self.lines = [] + self.last_line = 0 + self.stat = None + try: + self.stat = os.stat(self.filename) + except os.error: + pass + self.parse(filename) + + def reset(self): + """Update ModuleSource after the file has been modified.""" + stat = None + try: + stat = os.stat(self.filename) + except os.error: + pass + if not self.stat or not stat: + return + if (self.stat.st_size == stat.st_size and + self.stat.st_mtime == stat.st_mtime): + return + + # The file has been modified, attempt to reload its module. + self.stat = stat + for module in sys.modules.values(): + fname = getattr(module, '__file__', None) + if fname and canonic(fname) == self.filename: + try: + imp.reload(module) + except: + pass + break + + self.function_lineno = {} + self.lines = [] + self.last_line = 0 + self.parse(self.filename) + + def get_actual_bp(self, lineno): + """Return the actual breakpoint at lineno as (func_lno, actual_lno). + + func_lno: first line of the corresponding function definition + actual_lno: line where the debugger stops + """ + if (not lineno or lineno < 0 or + not self.last_line or lineno > self.last_line): + return None, None + + i = 0 + length = j = len(self.lines) + # find the index of the first line_set whose start is the first to be + # strictly greater than lineno + while i < j: + m = (i + j) // 2 + if lineno < self.lines[m][0]: + j = m + else: + i = m + 1 + + if i != 0: + ltype = self.lines[i-1][2] + # lineno inside previous LINE_GROUP or LINE_MULTI + if lineno <= self.lines[i-1][1]: + if ltype == LINE_GROUP: + return self.lines[i-1][3], lineno + elif ltype == LINE_MULTI: + return self.lines[i-1][3], self.lines[i-1][0] + if i != length: + ltype = self.lines[i][2] + # first statement of next group if not LINE_DEF + if ltype != LINE_DEF: + return self.lines[i][3], self.lines[i][0] + # breakpoints set on those lines are outside function definitions + return None, lineno + + def parse(self, filename): + source_lines = linecache.getlines(filename) + if not source_lines: + return + + def pop_nested(stack, indent): + # Pop nested classes or pop the def marker. + while stack and stack[-1][1] >= indent: + del stack[-1] + + # Stack of (class, indent) pairs or the marker of a function + # definition as (None, indent). + stack = [] + start = func_lno = 0 + got_newline = False + prev_tokentype = tokenize.NEWLINE + g = tokenize.generate_tokens(iter(source_lines).__next__) + try: + for tokentype, tok, srowcol, _end, _line in g: + if tokentype == token.DEDENT: + indent = srowcol[1] + # End of function definition. + if (stack and stack[-1][0] is None and + indent <= stack[-1][1]): + # The following condition is False when the lines have + # already been processed at NL or when this is an + # empty line. + if start and (ltype != LINE_MULTI or got_newline): + self.lines.append((start, last, ltype, func_lno)) + start = func_lno = 0 + pop_nested(stack, indent) + elif tokentype == tokenize.NL: + if start and ltype != LINE_DEF: + if got_newline: + self.lines.append((start, last, ltype, func_lno)) + start = 0 + # The start of a logical line with multiple physical + # lines + if (ltype != LINE_MULTI and + prev_tokentype != tokenize.NEWLINE and + prev_tokentype != tokenize.COMMENT): + start = srowcol[0] + got_newline = False + ltype = LINE_MULTI + elif tokentype == tokenize.NEWLINE: + last = srowcol[0] + got_newline = True + if start: + if ltype == LINE_DEF or ltype == LINE_MULTI: + self.lines.append((start, last, ltype, func_lno)) + start = 0 + elif tok == 'def': + lineno, indent = srowcol + pop_nested(stack, indent) + tokentype, meth_name = next(g)[0:2] + if tokentype != token.NAME: + continue # syntax error + if stack: + class_name = stack[-1][0] + if not class_name: + continue # a nested def + meth_name = '{}.{}'.format(class_name, meth_name) + ltype = LINE_DEF + self.function_lineno[meth_name] = lineno + start = func_lno = lineno + stack.append((None, indent)) # function definition marker + elif tok == 'class': + indent = srowcol[1] + pop_nested(stack, indent) + tokentype, class_name = next(g)[0:2] + if tokentype != token.NAME: + continue # syntax error + if stack: + cls = stack[-1][0] + if not cls: + continue # a class nested in a function + class_name = '{}.{}'.format(cls, class_name) + stack.append((class_name, indent)) + # Start a group of line(s). + elif (not start and stack and stack[-1][0] is None and + tokentype != tokenize.COMMENT and + tokentype != token.INDENT and + not (tokentype == token.STRING and + prev_tokentype == token.INDENT)): + start = srowcol[0] + got_newline = False + ltype = LINE_GROUP + + prev_tokentype = tokentype + except StopIteration: + pass + + # ENDMARKER + self.last_line = srowcol[0] - 1 + + def __str__(self): + def lines_per_func(): + # Group lines per function + func_lno = 0 + group = [] + for line_set in self.lines: + if line_set[3] != func_lno: + if group: + yield pprint.pformat(group) + func_lno = line_set[3] + group = [line_set] + else: + group.append(line_set) + if group: + yield pprint.pformat(group) + + return ('Functions:\n{}\nLines:\n{}\n'.format( + pprint.pformat(sorted( + self.function_lineno.items(), key=itemgetter(1))), + '\n'.join(lines_per_func()))) + +class ModuleBreakpoints: + """The breakpoints of a module. + + The breakpts attribute is a dictionary that maps the first line of a + function definition to a function_bps dictionary that maps each line of the + function, where one or more breakpoints are set, to the list of + corresponding Breakpoint instances. + + Important note: + A line in function_bps is the actual line of the breakpoint(s) (the line + where the debugger stops), this line may differ from the line attribute of + the Breakpoint instance(s) as set by the user. + """ + + def __init__(self, filename): + assert filename, ('Attempt to instantiate ModuleBreakpoints' + ' with {}'.format(filename)) + if filename not in _modules: + self.module_src = ModuleSource(filename) + if self.module_src.lines: + _modules[filename] = self.module_src + else: + self.module_src = _modules[filename] + self.stat = self.module_src.stat + self.breakpts = {} + + def reset(self): + self.module_src.reset() + stat = self.module_src.stat + if not self.stat or not stat: + return + if (self.stat.st_size == stat.st_size and + self.stat.st_mtime == stat.st_mtime): + return + + # The file has been modified + self.stat = stat + bplist = self.all_breakpoints() + self.breakpts = {} + for bp in bplist: + self.add_breakpoint(bp) + + def add_breakpoint(self, bp): + func_lno, actual_lno = self.module_src.get_actual_bp(bp.line) + # May happen after the file has been modified and a reset. + if not func_lno or not actual_lno: + return + if func_lno not in self.breakpts: + self.breakpts[func_lno] = {} + function_bps = self.breakpts[func_lno] + if actual_lno not in function_bps: + function_bps[actual_lno] = [] + function_bps[actual_lno].append(bp) + + def delete_breakpoint(self, bp): + func_lno, actual_lno = self.module_src.get_actual_bp(bp.line) + # May happen after the file has been modified and a reset. + if not func_lno or not actual_lno: + return + try: + function_bps = self.breakpts[func_lno] + bplist = function_bps[actual_lno] + bplist.remove(bp) + except (KeyError, ValueError): + assert False, ('Internal error: bpbynumber and breakpts' + ' are inconsistent') + if not bplist: + del function_bps[actual_lno] + if not function_bps: + del self.breakpts[func_lno] + + def get_breakpoints(self, lineno): + """Return the list of breakpoints set at lineno.""" + func_lno, actual_lno = self.module_src.get_actual_bp(lineno) + if func_lno not in self.breakpts: + return [] + function_bps = self.breakpts[func_lno] + if actual_lno not in function_bps: + return [] + return [bp for bp in sorted(function_bps[actual_lno], + key=attrgetter('number')) if bp.line == lineno] + + def all_breakpoints(self): + bpts = [] + for function_bps in self.breakpts.values(): + for bplist in function_bps.values(): + bpts.extend(bplist) + return [bp for bp in sorted(bpts, key=attrgetter('number'))] + +class FunctionQualifiedName: + """Utility to find where is defined a function from its qualified name.""" + + def __init__(self, name, module=None): + self.name = name + self.cur_fname = None + if module: + self.cur_fname = getfilename(module.__name__) + self.fileline = None, None + + def get_fileline(self): + i = self.name.rfind('.') + if i >= 0: + funcname = self.name[i+1:] + prefix = self.name[:i] + j = prefix.rfind('.') + # Try first the current module for a class method, then a function + # in prefix and last a method in prefix[:j]. + if (not (prefix and j < 0 and + self.lookup(self.cur_fname, self.name)) and + not self.lookup(getfilename(prefix), funcname) and + j > 0): + method = '{}.{}'.format(prefix[j+1:], funcname) + prefix = prefix[:j] + self.lookup(getfilename(prefix), method) + else: + # a function in the current module + self.lookup(self.cur_fname, self.name) + + return self.fileline + + def lookup(self, filename, funcname): + if filename: + try: + self.fileline = funcname_breakpoint(funcname, filename) + return True + except BdbError: + pass + return False class Bdb: """Generic Python debugger base class. @@ -20,25 +455,20 @@ def __init__(self, skip=None): self.skip = set(skip) if skip else None - self.breaks = {} - self.fncache = {} self.frame_returning = None + # A dictionary mapping a filename to a ModuleBreakpoints instance. + self.breakpoints = {} + # Backward compatibility def canonic(self, filename): - if filename == "<" + filename[1:-1] + ">": - return filename - canonic = self.fncache.get(filename) - if not canonic: - canonic = os.path.abspath(filename) - canonic = os.path.normcase(canonic) - self.fncache[filename] = canonic - return canonic + return canonic(filename) def reset(self): - import linecache linecache.checkcache() self.botframe = None self._set_stopinfo(None, None) + for module_bpts in self.breakpoints.values(): + module_bpts.reset() def trace_dispatch(self, frame, event, arg): if self.quitting: @@ -72,7 +502,7 @@ # First call of dispatch since reset() self.botframe = frame.f_back # (CT) Note that this may also be None! return self.trace_dispatch - if not (self.stop_here(frame) or self.break_anywhere(frame)): + if not (self.stop_here(frame) or self.break_at_function(frame)): # No need to trace this function return # None self.user_call(frame, arg) @@ -122,32 +552,33 @@ return False def break_here(self, frame): - filename = self.canonic(frame.f_code.co_filename) - if filename not in self.breaks: + filename = canonic(frame.f_code.co_filename) + if filename not in self.breakpoints: return False - lineno = frame.f_lineno - if lineno not in self.breaks[filename]: - # The line itself has no breakpoint, but maybe the line is the - # first line of a function with breakpoint set by function name. - lineno = frame.f_code.co_firstlineno - if lineno not in self.breaks[filename]: - return False + func_lno = frame.f_code.co_firstlineno + module_bpts = self.breakpoints[filename] + if (func_lno not in module_bpts.breakpts or + frame.f_lineno not in module_bpts.breakpts[func_lno]): + return False - # flag says ok to delete temp. bp - (bp, flag) = effective(filename, lineno, frame) - if bp: - self.currentbp = bp.number - if (flag and bp.temporary): - self.do_clear(str(bp.number)) - return True - else: - return False + # Handle multiple breakpoints on the same line (issue 14789) + self.effective_bp_list = [] + for bp in module_bpts.breakpts[func_lno][frame.f_lineno]: + stop, delete = bp.process_hit_event(frame) + if stop: + self.effective_bp_list.append(bp.number) + if bp.temporary and delete: + self.do_clear(str(bp.number)) + return len(self.effective_bp_list) != 0 def do_clear(self, arg): raise NotImplementedError("subclass of bdb must implement do_clear()") - def break_anywhere(self, frame): - return self.canonic(frame.f_code.co_filename) in self.breaks + def break_at_function(self, frame): + filename = canonic(frame.f_code.co_filename) + return (filename in self.breakpoints and + frame.f_code.co_firstlineno in + self.breakpoints[filename].breakpts) # Derived classes should override the user_* methods # to gain control. @@ -227,7 +658,7 @@ def set_continue(self): # Don't stop except at breakpoints or when finished self._set_stopinfo(self.botframe, None, -1) - if not self.breaks: + if not self.has_breaks(): # no breakpoints; run without debugger overhead sys.settrace(None) frame = sys._getframe().f_back @@ -244,39 +675,34 @@ # Derived classes and clients can call the following methods # to manipulate breakpoints. These methods return an # error message is something went wrong, None if all is well. - # Set_break prints out the breakpoint line and file:lineno. # Call self.get_*break*() to see the breakpoints or better # for bp in Breakpoint.bpbynumber: if bp: bp.bpprint(). - def set_break(self, filename, lineno, temporary=False, cond=None, - funcname=None): - filename = self.canonic(filename) - import linecache # Import as late as possible - line = linecache.getline(filename, lineno) - if not line: - return 'Line %s:%d does not exist' % (filename, lineno) - list = self.breaks.setdefault(filename, []) - if lineno not in list: - list.append(lineno) - bp = Breakpoint(filename, lineno, temporary, cond, funcname) - - def _prune_breaks(self, filename, lineno): - if (filename, lineno) not in Breakpoint.bplist: - self.breaks[filename].remove(lineno) - if not self.breaks[filename]: - del self.breaks[filename] + def set_break(self, fname, lineno, temporary=False, cond=None, + funcname=None): + # funcname is not used anymore and kept for backward compatibility + filename = canonic(fname) + if filename not in self.breakpoints: + module_bps = ModuleBreakpoints(filename) + if not module_bps.module_src.lines: + return 'No function and no method in {}'.format(fname) + self.breakpoints[filename] = module_bps + else: + module_bps = self.breakpoints[filename] + func_lno, actual_lno = module_bps.module_src.get_actual_bp(lineno) + if not func_lno: + if not actual_lno: + return 'Line {}:{} does not exist'.format(fname, lineno) + return ('Line {}:{} not within a function definition' + .format(fname, lineno)) + bp = Breakpoint(filename, lineno, module_bps, temporary, cond) def clear_break(self, filename, lineno): - filename = self.canonic(filename) - if filename not in self.breaks: - return 'There are no breakpoints in %s' % filename - if lineno not in self.breaks[filename]: + bplist = self.get_breaks(filename, lineno) + if not bplist: return 'There is no breakpoint at %s:%d' % (filename, lineno) - # If there's only one bp in the list for that file,line - # pair, then remove the breaks entry - for bp in Breakpoint.bplist[filename, lineno][:]: + for bp in bplist: bp.deleteMe() - self._prune_breaks(filename, lineno) def clear_bpbynumber(self, arg): try: @@ -284,25 +710,21 @@ except ValueError as err: return str(err) bp.deleteMe() - self._prune_breaks(bp.file, bp.line) - def clear_all_file_breaks(self, filename): - filename = self.canonic(filename) - if filename not in self.breaks: - return 'There are no breakpoints in %s' % filename - for line in self.breaks[filename]: - blist = Breakpoint.bplist[filename, line] - for bp in blist: - bp.deleteMe() - del self.breaks[filename] + def clear_all_file_breaks(self, fname): + filename = canonic(fname) + if (filename not in self.breakpoints or not + self.breakpoints[filename].breakpts.keys()): + return 'There are no breakpoints in %s' % fname + for bp in self.breakpoints[filename].all_breakpoints(): + bp.deleteMe() def clear_all_breaks(self): - if not self.breaks: + if not self.has_breaks(): return 'There are no breakpoints' for bp in Breakpoint.bpbynumber: if bp: bp.deleteMe() - self.breaks = {} def get_bpbynumber(self, arg): if not arg: @@ -320,25 +742,31 @@ return bp def get_break(self, filename, lineno): - filename = self.canonic(filename) - return filename in self.breaks and \ - lineno in self.breaks[filename] + return len(self.get_breaks(filename, lineno)) != 0 def get_breaks(self, filename, lineno): - filename = self.canonic(filename) - return filename in self.breaks and \ - lineno in self.breaks[filename] and \ - Breakpoint.bplist[filename, lineno] or [] + filename = canonic(filename) + if filename in self.breakpoints: + return self.breakpoints[filename].get_breakpoints(lineno) + return [] def get_file_breaks(self, filename): - filename = self.canonic(filename) - if filename in self.breaks: - return self.breaks[filename] - else: + filename = canonic(filename) + if filename not in self.breakpoints: return [] + return [bp.line for bp in self.breakpoints[filename].all_breakpoints()] def get_all_breaks(self): - return self.breaks + breaks = {} + for filename in self.breakpoints: + linebp_list = self.get_file_breaks(filename) + if linebp_list: + breaks[filename] = self.get_file_breaks(filename) + return breaks + + def has_breaks(self): + return any(self.breakpoints[f].breakpts.keys() + for f in self.breakpoints) # Derived classes and clients can call the following method # to get a data structure representing a stack trace. @@ -362,9 +790,9 @@ return stack, i def format_stack_entry(self, frame_lineno, lprefix=': '): - import linecache, reprlib + import reprlib frame, lineno = frame_lineno - filename = self.canonic(frame.f_code.co_filename) + filename = canonic(frame.f_code.co_filename) s = '%s(%r)' % (filename, lineno) if frame.f_code.co_name: s += frame.f_code.co_name @@ -455,29 +883,18 @@ Implements temporary breakpoints, ignore counts, disabling and (re)-enabling, and conditionals. - Breakpoints are indexed by number through bpbynumber and by - the file,line tuple using bplist. The former points to a - single instance of class Breakpoint. The latter points to a - list of such instances since there may be more than one - breakpoint per line. + Breakpoints are indexed by number through bpbynumber. """ - # XXX Keeping state in the class is a mistake -- this means - # you cannot have more than one active Bdb instance. + next = 1 # Next bp to be assigned + bpbynumber = [None] # Each entry is None or an instance of Bpt - next = 1 # Next bp to be assigned - bplist = {} # indexed by (file, lineno) tuple - bpbynumber = [None] # Each entry is None or an instance of Bpt - # index 0 is unused, except for marking an - # effective break .... see effective() - - def __init__(self, file, line, temporary=False, cond=None, funcname=None): - self.funcname = funcname - # Needed if funcname is not None. - self.func_first_executable_line = None + def __init__(self, file, line, module, temporary=False, + cond=None): self.file = file # This better be in canonical form! self.line = line + self.module = module self.temporary = temporary self.cond = cond self.enabled = True @@ -485,20 +902,13 @@ self.hits = 0 self.number = Breakpoint.next Breakpoint.next += 1 - # Build the two lists self.bpbynumber.append(self) - if (file, line) in self.bplist: - self.bplist[file, line].append(self) - else: - self.bplist[file, line] = [self] + self.module.add_breakpoint(self) def deleteMe(self): - index = (self.file, self.line) - self.bpbynumber[self.number] = None # No longer in list - self.bplist[index].remove(self) - if not self.bplist[index]: - # No more bp for this f:l combo - del self.bplist[index] + if self.bpbynumber[self.number]: + self.bpbynumber[self.number] = None # No longer in list + self.module.delete_breakpoint(self) def enable(self): self.enabled = True @@ -506,6 +916,27 @@ def disable(self): self.enabled = False + def process_hit_event(self, frame): + """Return (stop_state, delete_temporary) at a breakpoint hit event.""" + if not self.enabled: + return False, False + # Count every hit when breakpoint is enabled. + self.hits += 1 + # A conditional breakpoint. + if self.cond: + try: + if not eval(self.cond, frame.f_globals, frame.f_locals): + return False, False + except: + # If the breakpoint condition evaluation fails, the most + # conservative thing is to stop on the breakpoint. Don't + # delete temporary, as another hint to the user. + return True, False + if self.ignore > 0: + self.ignore -= 1 + return False, False + return True, True + def bpprint(self, out=None): if out is None: out = sys.stdout @@ -537,81 +968,6 @@ def __str__(self): return 'breakpoint %s at %s:%s' % (self.number, self.file, self.line) -# -----------end of Breakpoint class---------- - -def checkfuncname(b, frame): - """Check whether we should break here because of `b.funcname`.""" - if not b.funcname: - # Breakpoint was set via line number. - if b.line != frame.f_lineno: - # Breakpoint was set at a line with a def statement and the function - # defined is called: don't break. - return False - return True - - # Breakpoint set via function name. - - if frame.f_code.co_name != b.funcname: - # It's not a function call, but rather execution of def statement. - return False - - # We are in the right frame. - if not b.func_first_executable_line: - # The function is entered for the 1st time. - b.func_first_executable_line = frame.f_lineno - - if b.func_first_executable_line != frame.f_lineno: - # But we are not at the first line number: don't break. - return False - return True - -# Determines if there is an effective (active) breakpoint at this -# line of code. Returns breakpoint number or 0 if none -def effective(file, line, frame): - """Determine which breakpoint for this file:line is to be acted upon. - - Called only if we know there is a bpt at this - location. Returns breakpoint that was triggered and a flag - that indicates if it is ok to delete a temporary bp. - - """ - possibles = Breakpoint.bplist[file, line] - for b in possibles: - if not b.enabled: - continue - if not checkfuncname(b, frame): - continue - # Count every hit when bp is enabled - b.hits += 1 - if not b.cond: - # If unconditional, and ignoring go on to next, else break - if b.ignore > 0: - b.ignore -= 1 - continue - else: - # breakpoint and marker that it's ok to delete if temporary - return (b, True) - else: - # Conditional bp. - # Ignore count applies only to those bpt hits where the - # condition evaluates to true. - try: - val = eval(b.cond, frame.f_globals, frame.f_locals) - if val: - if b.ignore > 0: - b.ignore -= 1 - # continue - else: - return (b, True) - # else: - # continue - except: - # if eval fails, most conservative thing is to stop on - # breakpoint regardless of ignore count. Don't delete - # temporary, as another hint to user. - return (b, False) - return (None, None) - # -------------------- testing -------------------- @@ -621,10 +977,9 @@ if not name: name = '???' print('+++ call', name, args) def user_line(self, frame): - import linecache name = frame.f_code.co_name if not name: name = '???' - fn = self.canonic(frame.f_code.co_filename) + fn = canonic(frame.f_code.co_filename) line = linecache.getline(fn, frame.f_lineno, frame.f_globals) print('+++', fn, frame.f_lineno, name, ':', line.strip()) def user_return(self, frame, retval): diff --git a/Lib/pdb.py b/Lib/pdb.py --- a/Lib/pdb.py +++ b/Lib/pdb.py @@ -258,7 +258,7 @@ def user_line(self, frame): """This function is called when we stop or break at this line.""" if self._wait_for_mainpyfile: - if (self.mainpyfile != self.canonic(frame.f_code.co_filename) + if (self.mainpyfile != bdb.canonic(frame.f_code.co_filename) or frame.f_lineno <= 0): return self._wait_for_mainpyfile = False @@ -266,27 +266,39 @@ self.interaction(frame, None) def bp_commands(self, frame): - """Call every command that was set for the current active breakpoint - (if there is one). + """Call every command that was set for the current active breakpoints. Returns True if the normal interaction function must be called, False otherwise.""" - # self.currentbp is set in bdb in Bdb.break_here if a breakpoint was hit - if getattr(self, "currentbp", False) and \ - self.currentbp in self.commands: - currentbp = self.currentbp - self.currentbp = 0 - lastcmd_back = self.lastcmd - self.setup(frame, None) - for line in self.commands[currentbp]: - self.onecmd(line) - self.lastcmd = lastcmd_back - if not self.commands_silent[currentbp]: - self.print_stack_entry(self.stack[self.curindex]) - if self.commands_doprompt[currentbp]: - self._cmdloop() - self.forget() - return + # Handle multiple breakpoints on the same line (issue 14789) + # self.effective_bp_list is set in bdb in Bdb.break_here if breakpoints + # were hit + if (getattr(self, "effective_bp_list", False) and + self.effective_bp_list): + silent = True + doprompt = False + atleast_one_cmd = False + for bp in self.effective_bp_list: + if bp in self.commands: + atleast_one_cmd = True + lastcmd_back = self.lastcmd + self.setup(frame, None) + for line in self.commands[bp]: + self.onecmd(line) + self.lastcmd = lastcmd_back + if not self.commands_silent[bp]: + silent = False + if self.commands_doprompt[bp]: + doprompt = True + + self.effective_bp_list = [] + if atleast_one_cmd: + if not silent: + self.print_stack_entry(self.stack[self.curindex]) + if doprompt: + self._cmdloop() + self.forget() + return return 1 def user_return(self, frame, return_value): @@ -602,81 +614,69 @@ sys.path; the .py suffix may be omitted. """ if not arg: - if self.breaks: # There's at least one + if self.has_breaks(): # There's at least one. self.message("Num Type Disp Enb Where") for bp in bdb.Breakpoint.bpbynumber: if bp: self.message(bp.bpformat()) return - # parse arguments; comma has lowest precedence - # and cannot occur in filename - filename = None - lineno = None + # Parse arguments, comma has lowest precedence and cannot occur in + # filename. cond = None comma = arg.find(',') if comma > 0: # parse stuff after comma: "condition" cond = arg[comma+1:].lstrip() arg = arg[:comma].rstrip() + # parse stuff before comma: [filename:]lineno | function - colon = arg.rfind(':') - funcname = None - if colon >= 0: - filename = arg[:colon].rstrip() - f = self.lookupmodule(filename) - if not f: - self.error('%r not found from sys.path' % filename) - return - else: - filename = f - arg = arg[colon+1:].lstrip() + i = arg.rfind(':') + if i >= 0: + filename = arg[:i].strip() + if not filename: + filename = self.curframe.f_code.co_filename try: - lineno = int(arg) + lineno = int(arg[i+1:]) except ValueError: - self.error('Bad lineno: %s' % arg) + self.error('Bad lineno: "{}".'.format(arg)) return else: - # no colon; can be lineno or function try: + # A line number lineno = int(arg) + filename = self.curframe.f_code.co_filename except ValueError: + # A function or method name try: - func = eval(arg, - self.curframe.f_globals, - self.curframe_locals) - except: - func = arg - try: - if hasattr(func, '__func__'): - func = func.__func__ - code = func.__code__ - #use co_name to identify the bkpt (function names - #could be aliased, but co_name is invariant) - funcname = code.co_name - lineno = code.co_firstlineno - filename = code.co_filename - except: - # last thing to try - (ok, filename, ln) = self.lineinfo(arg) - if not ok: - self.error('The specified object %r is not a function ' - 'or was not found along sys.path.' % arg) - return - funcname = ok # ok contains a function name - lineno = int(ln) - if not filename: - filename = self.defaultFile() - # Check for reasonable breakpoint - line = self.checkline(filename, lineno) - if line: - # now set the break point - err = self.set_break(filename, line, temporary, cond, funcname) - if err: - self.error(err, file=self.stdout) - else: - bp = self.get_breaks(filename, line)[-1] - self.message("Breakpoint %d at %s:%d" % - (bp.number, bp.file, bp.line)) + filename, lineno = bdb.funcname_breakpoint(arg.strip(), + frame=self.curframe) + except bdb.BdbError as e: + self.error(e.args[0]) + return + + filename = bdb.canonic(filename) + if filename.startswith('<') and filename.endswith('>'): + if filename == '' and self.mainpyfile: + filename = self.mainpyfile + # else + # doctest installs a patch at linecache.getlines to allow to be linecached and readable. + else: + root, ext = os.path.splitext(filename) + if ext == '': + filename = filename + '.py' + if not os.path.exists(filename): + self.error('Bad filename: "{}".'.format(arg)) + return + + # Now set the break point + error = self.set_break(filename, lineno, temporary, cond) + if error: + self.error(error) + else: + bp = self.get_breaks(filename, lineno)[-1] + self.message('Breakpoint {:d} at {}:{:d}'.format( + bp.number, bp.file, bp.line)) # To be overridden in derived debuggers def defaultFile(self): @@ -700,60 +700,6 @@ complete_tbreak = _complete_location - def lineinfo(self, identifier): - failed = (None, None, None) - # Input is identifier, may be in single quotes - idstring = identifier.split("'") - if len(idstring) == 1: - # not in single quotes - id = idstring[0].strip() - elif len(idstring) == 3: - # quoted - id = idstring[1].strip() - else: - return failed - if id == '': return failed - parts = id.split('.') - # Protection for derived debuggers - if parts[0] == 'self': - del parts[0] - if len(parts) == 0: - return failed - # Best first guess at file to look at - fname = self.defaultFile() - if len(parts) == 1: - item = parts[0] - else: - # More than one part. - # First is module, second is method/class - f = self.lookupmodule(parts[0]) - if f: - fname = f - item = parts[1] - answer = find_function(item, fname) - return answer or failed - - def checkline(self, filename, lineno): - """Check whether specified line seems to be executable. - - Return `lineno` if it is, 0 if not (e.g. a docstring, comment, blank - line or EOF). Warning: testing is not comprehensive. - """ - # this method should be callable before starting debugging, so default - # to "no globals" if there is no current frame - globs = self.curframe.f_globals if hasattr(self, 'curframe') else None - line = linecache.getline(filename, lineno, globs) - if not line: - self.message('End of file') - return 0 - line = line.strip() - # Don't allow setting breakpoint at a blank line - if (not line or (line[0] == '#') or - (line[:3] == '"""') or line[:3] == "'''"): - self.error('Blank or comment') - return 0 - return lineno - def do_enable(self, arg): """enable bpnumber [bpnumber ...] Enables the breakpoints given as a space separated list of @@ -1213,7 +1159,7 @@ first = self.lineno + 1 if last is None: last = first + 10 - filename = self.curframe.f_code.co_filename + filename = bdb.canonic(self.curframe.f_code.co_filename) breaklist = self.get_file_breaks(filename) try: lines = linecache.getlines(filename, self.curframe.f_globals) @@ -1481,30 +1427,6 @@ # other helper functions - def lookupmodule(self, filename): - """Helper function for break/clear parsing -- may be overridden. - - lookupmodule() translates (possibly incomplete) file or module name - into an absolute file name. - """ - if os.path.isabs(filename) and os.path.exists(filename): - return filename - f = os.path.join(sys.path[0], filename) - if os.path.exists(f) and self.canonic(f) == self.mainpyfile: - return f - root, ext = os.path.splitext(filename) - if ext == '': - filename = filename + '.py' - if os.path.isabs(filename): - return filename - for dirname in sys.path: - while os.path.islink(dirname): - dirname = os.readlink(dirname) - fullname = os.path.join(dirname, filename) - if os.path.exists(fullname): - return fullname - return None - def _runscript(self, filename): # The script has to run in __main__ namespace (or imports from # __main__ will break). @@ -1524,7 +1446,7 @@ # avoid stopping before we reach the main script (see user_line and # user_call for details). self._wait_for_mainpyfile = True - self.mainpyfile = self.canonic(filename) + self.mainpyfile = bdb.canonic(filename) self._user_requested_quit = False with open(filename, "rb") as fp: statement = "exec(compile(%r, %r, 'exec'))" % \ diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -1,11 +1,17 @@ # A test suite for pdb; not very comprehensive at the moment. import imp +import bdb import pdb import sys import unittest import subprocess import textwrap +import time +try: + import threading +except ImportError: + threading = None from test import support # This little helper class is essential for testing pdb under doctest. @@ -275,6 +281,38 @@ """ +def test_issue_14792(): + """ + >>> def foo(): + ... x = 1 + ... x = 2 + + >>> def test_function(): + ... import pdb; pdb.Pdb(nosigint=True).set_trace() + ... foo() + + >>> with PdbTestInput([ + ... 'step', + ... 'step', + ... 'break foo', + ... 'continue', + ... ]): + ... test_function() + > (3)test_function() + -> foo() + (Pdb) step + --Call-- + > (1)foo() + -> def foo(): + (Pdb) step + > (2)foo() + -> x = 1 + (Pdb) break foo + Breakpoint 1 at :2 + (Pdb) continue + """ + + def do_nothing(): pass @@ -597,11 +635,36 @@ """ +def normalize(result, filename='', strip_bp_lnum=False): + """Normalize a test result.""" + lines = [] + for line in result.splitlines(): + while line.startswith('(Pdb) ') or line.startswith('(com) '): + line = line[6:] + words = line.split() + line = [] + # Replace tabs with spaces + for word in words: + if filename: + idx = word.find(filename) + # Remove the filename prefix + if idx > 0: + word = word[idx:] + if idx >=0 and strip_bp_lnum: + idx = word.find(':') + # Remove the ':' separator and breakpoint line number + if idx > 0: + word = word[:idx] + line.append(word) + line = ' '.join(line) + lines.append(line.strip()) + return '\n'.join(lines) + + class PdbTestCase(unittest.TestCase): - def run_pdb(self, script, commands): + def run_pdb(self, script, commands, filename): """Run 'script' lines with pdb and the pdb 'commands'.""" - filename = 'main.py' with open(filename, 'w') as f: f.write(textwrap.dedent(script)) self.addCleanup(support.unlink, filename) @@ -609,7 +672,7 @@ stdout = stderr = None with subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, - stderr=subprocess.STDOUT, + stderr=subprocess.PIPE, ) as proc: stdout, stderr = proc.communicate(str.encode(commands)) stdout = stdout and bytes.decode(stdout) @@ -662,11 +725,439 @@ with open('bar.py', 'w') as f: f.write(textwrap.dedent(bar)) self.addCleanup(support.unlink, 'bar.py') - stdout, stderr = self.run_pdb(script, commands) + stdout, stderr = self.run_pdb(script, commands, 'main.py') self.assertTrue( any('main.py(5)foo()->None' in l for l in stdout.splitlines()), 'Fail to step into the caller after a return') + def test_bdb_module_parser(self): + script = """ + x = 1 + + def foo(x, + y): + ''' + documentation + ''' + def nested_def(): + pass + + bar(a, + b) + # comment + # comment + pass # comment + bar( + a, b # comment + ) + class NestedClass: + def n_foo(self): # comment + pass + pass + + return bar( + a,b + ) + + def bar(a, b): + # comment + pass # comment + pass + # comment + + class C: + class D: + def d_foo(self): + pass + class E: + def e_foo(self): + pass + def c_bar(self): + pass + + """ + expected = """ + Functions: + [('foo', 4), + ('bar', 29), + ('C.D.d_foo', 37), + ('C.D.E.e_foo', 40), + ('C.c_bar', 42)] + Lines: + [(4, 5, 0, 4), + (9, 10, 1, 4), + (12, 13, 2, 4), + (16, 16, 1, 4), + (17, 19, 2, 4), + (20, 23, 1, 4), + (25, 27, 2, 4)] + [(29, 29, 0, 29), (31, 32, 1, 29)] + [(37, 37, 0, 37), (38, 38, 1, 37)] + [(40, 40, 0, 40), (41, 41, 1, 40)] + [(42, 42, 0, 42), (43, 43, 1, 42)] + """ + filename = 'main.py' + with open(filename, 'w') as f: + f.write(textwrap.dedent(script)) + self.addCleanup(support.unlink, filename) + result = str(bdb.ModuleSource(filename)) + expected = textwrap.dedent(expected) + self.assertTrue(result in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to parse correctly a module.'.format(expected, result)) + + def test_breakpoints_set_on_non_statement(self): + script = """ + def foo(): + + # comment + x = 1 + + x = len( + 'foo' + ) + + foo() + """ + commands = """ + break 3 + break 4 + break 8 + continue + break + continue + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:3 + Breakpoint 2 at main.py:4 + Breakpoint 3 at main.py:8 + > main.py(5)foo() + -> x = 1 + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:3 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:4 + breakpoint already hit 1 time + 3 breakpoint keep yes at main.py:8 + > main.py(7)foo() + -> x = len( + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to stop at breakpoint set at empty line, comment line or' + ' multi-line statement.'.format(expected, stdout)) + + def test_issue14789(self): + script = """ + def bar(a): + x = 1 + + bar(10) + bar(20) + """ + commands = """ + break bar + commands 1 + print a + end + ignore 1 1 + break bar + commands 2 + print a + 1 + end + ignore 2 1 + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def bar(a): + Breakpoint 1 at main.py:3 + Will ignore next 1 crossing of breakpoint 1. + Breakpoint 2 at main.py:3 + Will ignore next 1 crossing of breakpoint 2. + 20 + 21 + > main.py(3)bar() + -> x = 1 + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:3 + breakpoint already hit 2 times + 2 breakpoint keep yes at main.py:3 + breakpoint already hit 2 times + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to handle two breakpoints set on the same line.'.format( + expected, stdout)) + + def test_issue14795(self): + script = """ + class C: + def foo(self): + pass + """ + commands = """ + break C.foo + break bar.bar + quit + """ + bar = """ + def bar(): + pass + """ + expected = """ + > main.py(2)() + -> class C: + Breakpoint 1 at main.py:4 + Breakpoint 2 at bar.py:3 + """ + with open('bar.py', 'w') as f: + f.write(textwrap.dedent(bar)) + self.addCleanup(support.unlink, 'bar.py') + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(normalize(stdout, filename), 'bar.py') + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to set a breakpoint in a method, or in a function whose' + ' module is not yet imported.'.format(expected, stdout)) + + def test_issue_14808(self): + script = """ + def foo(): + pass + + def bar(): + pass + + foo() + bar() + """ + commands = """ + break 2 + break bar + continue + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:2 + Breakpoint 2 at main.py:6 + > main.py(3)foo() + -> pass + > main.py(6)bar() + -> pass + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:2 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:6 + breakpoint already hit 1 time + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to stop at breakpoint set at function definition line' + ' or at function name.'.format(expected, stdout)) + + @unittest.skipIf(not threading, 'the source file is changed in a thread') + def test_issue_14912(self): + script = """ + import bar + + def foo(): + bar.bar() + + foo() + """ + commands = """ + break bar.bar + continue + list + import time; time.sleep(2) + restart + continue + list + quit + """ + bar = """ + def bar(): + x = 1 + x = 2 + """ + bar_2 = """ + def bar(): + # comment + # comment + x = 1 + x = 2 + """ + expected = """ + > main.py(2)() + -> import bar + Breakpoint 1 at bar.py:3 + > bar.py(3)bar() + -> x = 1 + 1 + 2 def bar(): + 3 B-> x = 1 + 4 x = 2 + [EOF] + Restarting main.py with arguments: + main.py + > main.py(2)() + -> import bar + > bar.py(5)bar() + -> x = 1 + 1 + 2 def bar(): + 3 B # comment + 4 # comment + 5 -> x = 1 + 6 x = 2 + [EOF] + """ + with open('bar.py', 'w') as f: + f.write(textwrap.dedent(bar)) + self.addCleanup(support.unlink, 'bar.py') + filename = 'main.py' + with open(filename, 'w') as f: + f.write(textwrap.dedent(script)) + self.addCleanup(support.unlink, filename) + cmd = [sys.executable, '-m', 'pdb', filename] + stdout = stderr = None + with subprocess.Popen(cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as proc: + def update_source(): + # Wait for the Pdb subprocess to be fully started. + time.sleep(1) + with open('bar.py', 'w') as f: + f.write(textwrap.dedent(bar_2)) + threading.Thread(target=update_source).start() + stdout, stderr = proc.communicate(str.encode(commands)) + stdout = stdout and bytes.decode(stdout) + stderr = stderr and bytes.decode(stderr) + stdout = normalize(normalize(stdout, filename), 'bar.py') + expected = normalize(expected, 'bar.py') + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to restart a new session after the source of an imported' + ' module has been modified.'.format(expected, stdout)) + + def test_set_breakpoint_by_function_name(self): + script = """ + class C: + c_foo = 1 + + class D: + def d_foo(self): + pass + + def foo(): + pass + + not_a_function = 1 + foo() + """ + commands = """ + break C + break C.c_foo + break D.d_foo + break foo + break not_a_function + break len + break logging.handlers.SocketHandler.close + continue + break C + break C.c_foo + break D.d_foo + break foo + break not_a_function + quit + """ + expected = """ + > main.py(2)() + -> class C: + *** Bad name: "C". + *** Bad name: "C.c_foo". + Breakpoint 1 at main.py:7 + Breakpoint 2 at main.py:10 + *** Bad name: "not_a_function". + *** Cannot set a breakpoint at "len" + Breakpoint 3 at handlers.py: + > main.py(10)foo() + -> pass + *** Bad function name: "C". + *** Cannot set a breakpoint at "C.c_foo" + Breakpoint 4 at main.py:7 + Breakpoint 5 at main.py:10 + *** Cannot set a breakpoint at "not_a_function" + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(normalize(stdout, 'handlers.py', strip_bp_lnum=True), filename) + expected = normalize(expected, 'handlers.py', strip_bp_lnum=True) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Fail to handle a breakpoint set by function name.'.format(expected, stdout)) + + def test_breakpoint_set_by_line_and_funcname(self): + script = """ + def foo(): + pass + + foo() + """ + commands = """ + break foo + break 2 + continue + break + quit + """ + expected = """ + > main.py(2)() + -> def foo(): + Breakpoint 1 at main.py:3 + Breakpoint 2 at main.py:2 + > main.py(3)foo() + -> pass + Num Type Disp Enb Where + 1 breakpoint keep yes at main.py:3 + breakpoint already hit 1 time + 2 breakpoint keep yes at main.py:2 + breakpoint already hit 1 time + """ + filename = 'main.py' + stdout, stderr = self.run_pdb(script, commands, filename) + stdout = normalize(stdout, filename) + expected = normalize(expected) + self.assertTrue(stdout in expected, + '\n\nExpected:\n{}\nGot:\n{}\n' + 'Failed hits for breakpoints set by line number and by function' + ' name.'.format(expected, stdout)) + def tearDown(self): support.unlink(support.TESTFN)