#!/usr/bin/env python #---------------------------------------------------------------------- # Be sure to add the python path that points to the LLDB shared library. # On MacOSX csh, tcsh: # setenv PYTHONPATH /Applications/Xcode.app/Contents/SharedFrameworks/LLDB.framework/Resources/Python # On MacOSX sh, bash: # export PYTHONPATH=/Applications/Xcode.app/Contents/SharedFrameworks/LLDB.framework/Resources/Python #---------------------------------------------------------------------- import optparse import os import platform import re import resource import sys import subprocess import time import types #---------------------------------------------------------------------- # Code that auto imports LLDB #---------------------------------------------------------------------- try: # Just try for LLDB in case PYTHONPATH is already correctly setup import lldb except ImportError: lldb_python_dirs = list() # lldb is not in the PYTHONPATH, try some defaults for the current platform platform_system = platform.system() if platform_system == 'Darwin': # On Darwin, try the currently selected Xcode directory xcode_dir = subprocess.check_output("xcode-select --print-path", shell=True) if xcode_dir: lldb_python_dirs.append( os.path.realpath( xcode_dir + '/../SharedFrameworks/LLDB.framework/Resources/Python')) lldb_python_dirs.append( xcode_dir + '/Library/PrivateFrameworks/LLDB.framework/Resources/Python') lldb_python_dirs.append( '/System/Library/PrivateFrameworks/LLDB.framework/Resources/Python') success = False for lldb_python_dir in lldb_python_dirs: if os.path.exists(lldb_python_dir): if not (sys.path.__contains__(lldb_python_dir)): sys.path.append(lldb_python_dir) try: import lldb except ImportError: pass else: print('imported lldb from: "%s"' % (lldb_python_dir)) success = True break if not success: print("error: couldn't locate the 'lldb' module, please set PYTHONPATH correctly") sys.exit(1) class Timer: def __enter__(self): self.start = time.clock() return self def __exit__(self, *args): self.end = time.clock() self.interval = self.end - self.start class Action(object): """Class that encapsulates actions to take when a thread stops for a reason.""" def __init__(self, callback=None, callback_owner=None): self.callback = callback self.callback_owner = callback_owner def ThreadStopped(self, thread): assert False, "performance.Action.ThreadStopped(self, thread) must be overridden in a subclass" class PlanCompleteAction (Action): def __init__(self, callback=None, callback_owner=None): Action.__init__(self, callback, callback_owner) def ThreadStopped(self, thread): if thread.GetStopReason() == lldb.eStopReasonPlanComplete: if self.callback: if self.callback_owner: self.callback(self.callback_owner, thread) else: self.callback(thread) return True return False class BreakpointAction (Action): def __init__( self, callback=None, callback_owner=None, name=None, module=None, file=None, line=None, breakpoint=None): Action.__init__(self, callback, callback_owner) self.modules = lldb.SBFileSpecList() self.files = lldb.SBFileSpecList() self.breakpoints = list() # "module" can be a list or a string if breakpoint: self.breakpoints.append(breakpoint) else: if module: if isinstance(module, types.ListType): for module_path in module: self.modules.Append( lldb.SBFileSpec(module_path, False)) elif isinstance(module, types.StringTypes): self.modules.Append(lldb.SBFileSpec(module, False)) if name: # "file" can be a list or a string if file: if isinstance(file, types.ListType): self.files = lldb.SBFileSpecList() for f in file: self.files.Append(lldb.SBFileSpec(f, False)) elif isinstance(file, types.StringTypes): self.files.Append(lldb.SBFileSpec(file, False)) self.breakpoints.append( self.target.BreakpointCreateByName( name, self.modules, self.files)) elif file and line: self.breakpoints.append( self.target.BreakpointCreateByLocation( file, line)) def ThreadStopped(self, thread): if thread.GetStopReason() == lldb.eStopReasonBreakpoint: for bp in self.breakpoints: if bp.GetID() == thread.GetStopReasonDataAtIndex(0): if self.callback: if self.callback_owner: self.callback(self.callback_owner, thread) else: self.callback(thread) return True return False class TestCase: """Class that aids in running performance tests.""" def __init__(self): self.verbose = False self.debugger = lldb.SBDebugger.Create() self.target = None self.process = None self.thread = None self.launch_info = None self.done = False self.listener = self.debugger.GetListener() self.user_actions = list() self.builtin_actions = list() self.bp_id_to_dict = dict() def Setup(self, args): self.launch_info = lldb.SBLaunchInfo(args) def Run(self, args): assert False, "performance.TestCase.Run(self, args) must be subclassed" def Launch(self): if self.target: error = lldb.SBError() self.process = self.target.Launch(self.launch_info, error) if not error.Success(): print("error: %s" % error.GetCString()) if self.process: self.process.GetBroadcaster().AddListener(self.listener, lldb.SBProcess.eBroadcastBitStateChanged | lldb.SBProcess.eBroadcastBitInterrupt) return True return False def WaitForNextProcessEvent(self): event = None if self.process: while event is None: process_event = lldb.SBEvent() if self.listener.WaitForEvent(lldb.UINT32_MAX, process_event): state = lldb.SBProcess.GetStateFromEvent(process_event) if self.verbose: print("event = %s" % (lldb.SBDebugger.StateAsCString(state))) if lldb.SBProcess.GetRestartedFromEvent(process_event): continue if state == lldb.eStateInvalid or state == lldb.eStateDetached or state == lldb.eStateCrashed or state == lldb.eStateUnloaded or state == lldb.eStateExited: event = process_event self.done = True elif state == lldb.eStateConnected or state == lldb.eStateAttaching or state == lldb.eStateLaunching or state == lldb.eStateRunning or state == lldb.eStateStepping or state == lldb.eStateSuspended: continue elif state == lldb.eStateStopped: event = process_event call_test_step = True fatal = False selected_thread = False for thread in self.process: frame = thread.GetFrameAtIndex(0) select_thread = False stop_reason = thread.GetStopReason() if self.verbose: print("tid = %#x pc = %#x " % (thread.GetThreadID(), frame.GetPC()), end=' ') if stop_reason == lldb.eStopReasonNone: if self.verbose: print("none") elif stop_reason == lldb.eStopReasonTrace: select_thread = True if self.verbose: print("trace") elif stop_reason == lldb.eStopReasonPlanComplete: select_thread = True if self.verbose: print("plan complete") elif stop_reason == lldb.eStopReasonThreadExiting: if self.verbose: print("thread exiting") elif stop_reason == lldb.eStopReasonExec: if self.verbose: print("exec") elif stop_reason == lldb.eStopReasonInvalid: if self.verbose: print("invalid") elif stop_reason == lldb.eStopReasonException: select_thread = True if self.verbose: print("exception") fatal = True elif stop_reason == lldb.eStopReasonBreakpoint: select_thread = True bp_id = thread.GetStopReasonDataAtIndex(0) bp_loc_id = thread.GetStopReasonDataAtIndex(1) if self.verbose: print("breakpoint id = %d.%d" % (bp_id, bp_loc_id)) elif stop_reason == lldb.eStopReasonWatchpoint: select_thread = True if self.verbose: print("watchpoint id = %d" % (thread.GetStopReasonDataAtIndex(0))) elif stop_reason == lldb.eStopReasonSignal: select_thread = True if self.verbose: print("signal %d" % (thread.GetStopReasonDataAtIndex(0))) elif stop_reason == lldb.eStopReasonFork: if self.verbose: print("fork pid = %d" % (thread.GetStopReasonDataAtIndex(0))) elif stop_reason == lldb.eStopReasonVFork: if self.verbose: print("vfork pid = %d" % (thread.GetStopReasonDataAtIndex(0))) elif stop_reason == lldb.eStopReasonVForkDone: if self.verbose: print("vfork done") if select_thread and not selected_thread: self.thread = thread selected_thread = self.process.SetSelectedThread( thread) for action in self.user_actions: action.ThreadStopped(thread) if fatal: # if self.verbose: # Xcode.RunCommand(self.debugger,"bt all",true) sys.exit(1) return event class Measurement: '''A class that encapsulates a measurement''' def __init__(self): object.__init__(self) def Measure(self): assert False, "performance.Measurement.Measure() must be subclassed" class MemoryMeasurement(Measurement): '''A class that can measure memory statistics for a process.''' def __init__(self, pid): Measurement.__init__(self) self.pid = pid self.stats = [ "rprvt", "rshrd", "rsize", "vsize", "vprvt", "kprvt", "kshrd", "faults", "cow", "pageins"] self.command = "top -l 1 -pid %u -stats %s" % ( self.pid, ",".join(self.stats)) self.value = dict() def Measure(self): output = subprocess.getoutput(self.command).split("\n")[-1] values = re.split('[-+\s]+', output) for (idx, stat) in enumerate(values): multiplier = 1 if stat: if stat[-1] == 'K': multiplier = 1024 stat = stat[:-1] elif stat[-1] == 'M': multiplier = 1024 * 1024 stat = stat[:-1] elif stat[-1] == 'G': multiplier = 1024 * 1024 * 1024 elif stat[-1] == 'T': multiplier = 1024 * 1024 * 1024 * 1024 stat = stat[:-1] self.value[self.stats[idx]] = int(stat) * multiplier def __str__(self): '''Dump the MemoryMeasurement current value''' s = '' for key in self.value.keys(): if s: s += "\n" s += "%8s = %s" % (key, self.value[key]) return s class TesterTestCase(TestCase): def __init__(self): TestCase.__init__(self) self.verbose = True self.num_steps = 5 def BreakpointHit(self, thread): bp_id = thread.GetStopReasonDataAtIndex(0) loc_id = thread.GetStopReasonDataAtIndex(1) print("Breakpoint %i.%i hit: %s" % (bp_id, loc_id, thread.process.target.FindBreakpointByID(bp_id))) thread.StepOver() def PlanComplete(self, thread): if self.num_steps > 0: thread.StepOver() self.num_steps = self.num_steps - 1 else: thread.process.Kill() def Run(self, args): self.Setup(args) with Timer() as total_time: self.target = self.debugger.CreateTarget(args[0]) if self.target: with Timer() as breakpoint_timer: bp = self.target.BreakpointCreateByName("main") print( 'Breakpoint time = %.03f sec.' % breakpoint_timer.interval) self.user_actions.append( BreakpointAction( breakpoint=bp, callback=TesterTestCase.BreakpointHit, callback_owner=self)) self.user_actions.append( PlanCompleteAction( callback=TesterTestCase.PlanComplete, callback_owner=self)) if self.Launch(): while not self.done: self.WaitForNextProcessEvent() else: print("error: failed to launch process") else: print("error: failed to create target with '%s'" % (args[0])) print('Total time = %.03f sec.' % total_time.interval) if __name__ == '__main__': lldb.SBDebugger.Initialize() test = TesterTestCase() test.Run(sys.argv[1:]) mem = MemoryMeasurement(os.getpid()) mem.Measure() print(str(mem)) lldb.SBDebugger.Terminate() # print "sleeeping for 100 seconds" # time.sleep(100)