Python灰帽子--黑客与逆向工程师的Python编程之道 笔记,过程问题解决
python,黑客,编程,逆向2016-08-15
跟着学习进度不断更新中。。。。
power by 《python灰帽子--黑客与逆向工程师的Python编程之道》
本文链接:http://blog.csdn.net/u012763794/article/details/52174275
自从上次读了python黑帽子(http://blog.csdn.net/u012763794/article/details/50612756),感觉作者写的书还不错,现在来读读python灰帽子吧(感谢翻译书的人,让我们有这么好的学习教材)
同理,我会根据学习进度不断更新, 也欢迎大家像python黑帽子一样,在下面评论给我以鼓励,我会带更多我的学习成果给大家
同样给书中全部代码链接(代码除了常量定义,都是手敲的,所以根据学习进度更新)(github):https://github.com/giantbranch/gray-hat-python-src
# -*- coding: utf-8 -*- # @Date : 2016-08-10 20:30:23 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from ctypes import * c_int() c_char_p('Hello world!') c_ushort(65531) c_short(-5) seitz = c_char_p("loves the python") print seitz print seitz.value exit()
//C语言 struct beer_recipe{ int amt_barley; int amt_water; }; #python class beer_recipe(Structure): _fields_ = [ ("amt_barley", c_int), ("amt_water", c_int), ]
//C语言 union{ long barley_long; int barley_int; char barley_char[8] }barley_amount; #python class barley_amount(Union): _fields_ = [ ("barley_long", c_long), ("barley_int", c_int), ("barley_char", c_char*8), ]
# -*- coding: utf-8 -*- # @Date : 2016-08-11 16:07:38 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents #把所有的结构体,联合体,常量等放这,方便以后维护 from ctypes import * # 给ctypes类型重新命名,跟windows编程接轨吧 WORD = c_ushort DWORD = c_ulong LPBYTE = POINTER(c_ubyte) LPTSTR = POINTER(c_char) HANDLE = c_void_p #常量 DEBUG_PROCESS = 0x00000001 CREATE_NEW_CONSOLE = 0x00000010 #CreateProcessA()函数的结构,(用于设置创建子进程的各种属性) class STARTUPINFO(Structure): _fields_ = [ ("cb", DWORD), ("lpReserved", LPTSTR), ("lpDesktop", LPTSTR), ("lpTitle", LPTSTR), ("dwX", DWORD), ("dwY", DWORD), ("dwXSize", DWORD), ("dwYSize", DWORD), ("dwXCountChars", DWORD), ("dwYCountChars", DWORD), ("dwFillAttribute", DWORD), ("dwFlags", DWORD), ("wShowWindow", WORD), ("cbReserved2", WORD), ("lpReserved2", LPTSTR), ("hStdInput", DWORD), ("hStdOutput", DWORD), ("hStdError", DWORD), ] #进程的信息:进程线程的句柄,进程线程的id class PROCESS_INFORMATION(Structure): _fields_ = [ ("hProcess", HANDLE), ("hThread", HANDLE), ("dwProcessId", DWORD), ("dwThreadId", DWORD), ]
# -*- coding: utf-8 -*- # @Date : 2016-08-11 16:48:16 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from ctypes import * from my_debugger_defines import * kernel32 = windll.kernel32 class debugger(): def __init__(self): pass def load(self, path_to_exe): creation_flags = DEBUG_PROCESS startupinfo = STARTUPINFO() process_information = PROCESS_INFORMATION() startupinfo.dwFlags = 0x1 startupinfo.wShowWindow = 0x0 startupinfo.cb = sizeof(startupinfo) if kernel32.CreateProcessA(path_to_exe, None, None, None, None, creation_flags, None, None, byref(startupinfo), byref(process_information)): print "[*] we have successfully launched the process!" print "[*] PID:%d" % process_information.dwProcessId else: print "[*] Error:0x%08x." % kernel32.GetLastError()启动代码
# -*- coding: utf-8 -*- # @Date : 2016-08-12 14:18:10 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents import my_debugger debugger = my_debugger.debugger() debugger.load("C:\\WINDOWS\\system32\\calc.exe")
# 获取进程的句柄,要调试当然要全不权限了 def open_process(self, pid): h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS, pid, False) return h_process def attach(self, pid): self.h_process = self.open_process(pid) #尝试附加到某个pid的程序上 if kernel32.DebugActiveProcess(pid): self.debugger_active = True self.pid = pid self.run() else: print "[*] Unable to attach to the process." #既然都附加上去了,等待调试事件咯 def run(self): while self.debugger_active == True: self.get_debug_event() # 等待调试事件,获取调试事件 def get_debug_event(self): debug_event = DEBUG_EVENT() continue_status = DBG_CONTINUE #INFINITE表示无限等待 if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE): #现在我们暂时不对事件进行处理 #现在只是简单地恢复进程的运行吧 raw_input("Press a key to continue...") self.debugger_active = False kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId, continue_status) def detach(self): if kernel32.DebugActiveProcessStop(self.pid): print "[*] Finished debugging. Exiting..." else: print "There was an error" return False
import my_debugger debugger = my_debugger.debugger() # debugger.load("C:\\WINDOWS\\system32\\calc.exe") pid = raw_input("Enter the PID of the process to attach to:") debugger.attach(int(pid)) debugger.detach()
def open_thread(self, thread_id): h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id) if h_thread is not None: return h_thread else: print "[*] Could not obtain a valid thread handle." return False def enumerate_threads(self): thread_entry = THREADENTRY32() thread_list = [] snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid) if snapshot is not None: thread_entry.dwSize = sizeof(thread_entry) success = kernel32.Thread32First(snapshot, byref(thread_entry)) while success: if thread_entry.th32OwnerProcessID == self.pid: thread_list.append(thread_entry.th32ThreadID) success = kernel32.Thread32Next(snapshot, byref(thread_entry)) kernel32.CloseHandle(snapshot) return thread_list else: print "enumerate_threads fail." return False def get_thread_context(self, thread_id): context = CONTEXT() context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS h_thread = self.open_thread(thread_id) if kernel32.GetThreadContext(h_thread, byref(context)): kernel32.CloseHandle(h_thread) return context else: print "get_thread_context fail." return Falsemy_test
# -*- coding: utf-8 -*- # @Date : 2016-08-12 14:18:10 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents import my_debugger debugger = my_debugger.debugger() # debugger.load("C:\\WINDOWS\\system32\\calc.exe") pid = raw_input("Enter the PID of the process to attach to:") debugger.attach(int(pid)) threadList = debugger.enumerate_threads() print threadList for thread in threadList: thread_context = debugger.get_thread_context(thread) # %08x就是8位的十六进制,不够就0补充咯 print "[*] Dumping registers for thread ID:0x%08x" % thread print "[**] EIP:0x%08x" % thread_context.Eip print "[**] ESP:0x%08x" % thread_context.Esp print "[**] EBP:0x%08x" % thread_context.Ebp print "[**] EAX:0x%08x" % thread_context.Eax print "[**] EBX:0x%08x" % thread_context.Ebx print "[**] ECX:0x%08x" % thread_context.Ecx print "[**] EDX:0x%08x" % thread_context.Edx print "[*] END DUMP" debugger.detach()
# raw_input("Press a key to continue...") # self.debugger_active = False还要把run注释掉
# 调试事件常量 EXCEPTION_DEBUG_EVENT = 0x1 CREATE_THREAD_DEBUG_EVENT = 0x2 CREATE_PROCESS_DEBUG_EVENT = 0x3 EXIT_THREAD_DEBUG_EVENT = 0x4 EXIT_PROCESS_DEBUG_EVENT = 0x5 LOAD_DLL_DEBUG_EVENT = 0x6 UNLOAD_DLL_DEBUG_EVENT = 0x7 OUTPUT_DEBUG_STRING_EVENT = 0x8 RIP_EVENT = 0x9
def read_process_memory(self, address, length): data = "" read_buf = create_string_buffer(length) count = c_ulong(0) if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, byref(count)): return False else: data += read_buf.raw return data def write_process_memory(self, address, data): count = c_ulong(0) length = len(data) c_data = c_char_p(data[count.value:]) if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, byref(count)): return False else: return True # 设置断点 def bp_set(self, address): # 看看断点的字典里是不是已经存在这个断点的地址了 if not self.breakpoints.has_key(address): try: # 先读取原来的一个字节,保存后再写入0xCC original_byte = self.read_process_memory(address, 1) self.write_process_memory(address, '\xCC') self.breakpoints[address] = (address, original_byte) except: return False return True # 获取某个模块(一般是dll)中的某个函数的地址 def func_resolve(self, dll, function): handle = kernel32.GetModuleHandleA(dll) address = kernel32.GetProcAddress(handle, function) kernel32.CloseHandle(handle) return address
# -*- coding: utf-8 -*- # @Date : 2016-08-14 10:04:29 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from pydbg import * from pydbg.defines import * import struct import random # 这是我们定义的回调函数 def printf_randomizer(dbg): # 用esp索引count局部变量的值 parameter_addr = dbg.context.Esp + 0x8 counter = dbg.read_process_memory(parameter_addr, 4) print repr(counter) # L表示unsigned long的意思 # print struct.unpack("L", counter) counter = struct.unpack("L", counter)[0] print "Counter:%d" % int(counter) # 生成1到100的随机数,再转换成二进制格式的 random_counter = random.randint(1, 100) random_counter = struct.pack("L", random_counter)[0] print repr(random_counter) dbg.write_process_memory(parameter_addr, random_counter) print GetLastError() return DBG_CONTINUE dbg = pydbg() pid = raw_input("Please Enter the printf_loop.py PID:") # 附加 dbg.attach(int(pid)) printf_address = dbg.func_resolve("msvcrt", "printf") # description为断点设置名字,handler设置回调函数 dbg.bp_set(printf_address, description="printf_address", handler=printf_randomizer) # 启动起来 dbg.run()
# -*- coding: utf-8 -*- # @Date : 2016-08-14 20:53:53 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from ctypes import * msvcrt = cdll.msvcrt raw_input("Once the debugger is attached, press any key.") # 定义一个缓冲区 buffer = c_char_p("AAAAA") # 用于溢出的字符串 overflow = 'A' * 100 # 溢出 msvcrt.strcpy(buffer, overflow)
# -*- coding: utf-8 -*- # @Date : 2016-08-14 20:57:34 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from pydbg import * from pydbg.defines import * import utils def check_accessv(dbg): if dbg.dbg.u.Exception.dwFirstChance: return DBG_EXCEPTION_NOT_HANDLED crash_bin = utils.crash_binning.crash_binning() crash_bin.record_crash(dbg) dbg.terminate_process() return DBG_EXCEPTION_NOT_HANDLED pid = raw_input("Enter the Process ID:") dbg = pydbg() dbg.attach(int(pid)) dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, check_accessv) dbg.run()
# -*- coding: utf-8 -*- # @Date : 2016-08-14 22:31:08 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from pydbg import * from pydbg.defines import * import utils # 设置我们要监视的代码的数量,就是内存访问违规后输出多少代码(指令) MAX_INSTRUCTIONS = 10 # 一些危险的函数 dangerous_functions = { "strcpy":"msvcrt.dll", "strncpy":"msvcrt.dll", "sprintf":"msvcrt.dll", "vsprintf":"msvcrt.dll" } dangerous_functions_resolved = {} crash_encountered = False instruction_count = 0 def danger_handler(dbg): esp_offset = 0 print "[*] Hit %s" % dangerous_functions_resolved[dbg.context.Eip] print "================================================================================" while esp_offset<=20: parameter = dbg.smart_dereference(dbg.context.Esp + esp_offset) print "[ESP + %d] => %s" % (esp_offset, parameter) esp_offset += 4 print "================================================================================" dbg.suspend_all_threads() dbg.process_snapshot() dbg.resume_all_threads() return DBG_CONTINUE def access_violation_handler(dbg): global crash_encountered if dbg.dbg.u.Exception.dwFirstChance: return DBG_EXCEPTION_NOT_HANDLED crash_bin = utils.crash_binning.crash_binning() crash_bin.record_crash(dbg) print crash_bin.crash_synopsis() if crash_encountered == False: dbg.suspend_all_threads() dbg.process_restore() crash_encountered = True for thread_id in dbg.enumerate_threads(): print "[*] Setting single step for thread:0x%08x" % thread_id h_thread = dbg.open_thread(thread_id) dbg.single_step(True, h_thread) dbg.close_handle(h_thread) dbg.resume_all_threads() return DBG_CONTINUE else: dbg.terminate_process() return DBG_EXCEPTION_NOT_HANDLED def single_step_handler(dbg): global instruction_count global crash_encountered if crash_encountered: if instruction_count == MAX_INSTRUCTIONS: dbg.single_step(False) return DBG_CONTINUE else: instruction = dbg.disasm(dbg.context.Eip) print "#%d\t0x%08x : %s" % (instruction_count, dbg.context.Eip, instruction) instruction_count += 1 dbg.single_step(True) return DBG_CONTINUE dbg = pydbg() pid = int(raw_input("Enter the PID you wish to monitor:")) dbg.attach(pid) for func in dangerous_functions.keys(): func_address = dbg.func_resolve(dangerous_functions[func], func) print "[*] Resolved breakpoint:%s -> 0x%08x" % (func, func_address) dbg.bp_set(func_address, handler=danger_handler) dangerous_functions_resolved[func_address] = func dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, access_violation_handler) dbg.set_callback(EXCEPTION_SINGLE_STEP, single_step_handler) dbg.run()
from immlib import * def main(args): imm = Debugger() return "[*] PyCommand Executed!"有两个必备条件: