Python灰帽子--黑客与逆向工程师的Python编程之道 笔记,过程问题解决
python,黑客,编程,逆向2016-08-15
跟着学习进度不断更新中。。。。
power by 《python灰帽子--黑客与逆向工程师的Python编程之道》
本文链接:http://blog.csdn.net/u012763794/article/details/52174275
自从上次读了python黑帽子(http://blog.csdn.net/u012763794/article/details/50612756),感觉作者写的书还不错,现在来读读python灰帽子吧(感谢翻译书的人,让我们有这么好的学习教材)
同理,我会根据学习进度不断更新, 也欢迎大家像python黑帽子一样,在下面评论给我以鼓励,我会带更多我的学习成果给大家
同样给书中全部代码链接(代码除了常量定义,都是手敲的,所以根据学习进度更新)(github):https://github.com/giantbranch/gray-hat-python-src
# -*- coding: utf-8 -*-
# @Date : 2016-08-10 20:30:23
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
from ctypes import *
c_int()
c_char_p('Hello world!')
c_ushort(65531)
c_short(-5)
seitz = c_char_p("loves the python")
print seitz
print seitz.value
exit()
//C语言
struct beer_recipe{
int amt_barley;
int amt_water;
};
#python
class beer_recipe(Structure):
_fields_ = [
("amt_barley", c_int),
("amt_water", c_int),
]
//C语言
union{
long barley_long;
int barley_int;
char barley_char[8]
}barley_amount;
#python
class barley_amount(Union):
_fields_ = [
("barley_long", c_long),
("barley_int", c_int),
("barley_char", c_char*8),
]
# -*- coding: utf-8 -*-
# @Date : 2016-08-11 16:07:38
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
#把所有的结构体,联合体,常量等放这,方便以后维护
from ctypes import *
# 给ctypes类型重新命名,跟windows编程接轨吧
WORD = c_ushort
DWORD = c_ulong
LPBYTE = POINTER(c_ubyte)
LPTSTR = POINTER(c_char)
HANDLE = c_void_p
#常量
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
#CreateProcessA()函数的结构,(用于设置创建子进程的各种属性)
class STARTUPINFO(Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXCountChars", DWORD),
("dwYCountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPTSTR),
("hStdInput", DWORD),
("hStdOutput", DWORD),
("hStdError", DWORD),
]
#进程的信息:进程线程的句柄,进程线程的id
class PROCESS_INFORMATION(Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
# -*- coding: utf-8 -*- # @Date : 2016-08-11 16:48:16 # @Author : giantbranch (giantbranch@gmail.com) # @Link : http://blog.csdn.net/u012763794?viewmode=contents from ctypes import * from my_debugger_defines import * kernel32 = windll.kernel32 class debugger(): def __init__(self): pass def load(self, path_to_exe): creation_flags = DEBUG_PROCESS startupinfo = STARTUPINFO() process_information = PROCESS_INFORMATION() startupinfo.dwFlags = 0x1 startupinfo.wShowWindow = 0x0 startupinfo.cb = sizeof(startupinfo) if kernel32.CreateProcessA(path_to_exe, None, None, None, None, creation_flags, None, None, byref(startupinfo), byref(process_information)): print "[*] we have successfully launched the process!" print "[*] PID:%d" % process_information.dwProcessId else: print "[*] Error:0x%08x." % kernel32.GetLastError()启动代码
# -*- coding: utf-8 -*-
# @Date : 2016-08-12 14:18:10
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
import my_debugger
debugger = my_debugger.debugger()
debugger.load("C:\\WINDOWS\\system32\\calc.exe")
# 获取进程的句柄,要调试当然要全不权限了
def open_process(self, pid):
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS, pid, False)
return h_process
def attach(self, pid):
self.h_process = self.open_process(pid)
#尝试附加到某个pid的程序上
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = pid
self.run()
else:
print "[*] Unable to attach to the process."
#既然都附加上去了,等待调试事件咯
def run(self):
while self.debugger_active == True:
self.get_debug_event()
# 等待调试事件,获取调试事件
def get_debug_event(self):
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
#INFINITE表示无限等待
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
#现在我们暂时不对事件进行处理
#现在只是简单地恢复进程的运行吧
raw_input("Press a key to continue...")
self.debugger_active = False
kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId, continue_status)
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print "[*] Finished debugging. Exiting..."
else:
print "There was an error"
return False
import my_debugger
debugger = my_debugger.debugger()
# debugger.load("C:\\WINDOWS\\system32\\calc.exe")
pid = raw_input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
debugger.detach()def open_thread(self, thread_id): h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id) if h_thread is not None: return h_thread else: print "[*] Could not obtain a valid thread handle." return False def enumerate_threads(self): thread_entry = THREADENTRY32() thread_list = [] snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid) if snapshot is not None: thread_entry.dwSize = sizeof(thread_entry) success = kernel32.Thread32First(snapshot, byref(thread_entry)) while success: if thread_entry.th32OwnerProcessID == self.pid: thread_list.append(thread_entry.th32ThreadID) success = kernel32.Thread32Next(snapshot, byref(thread_entry)) kernel32.CloseHandle(snapshot) return thread_list else: print "enumerate_threads fail." return False def get_thread_context(self, thread_id): context = CONTEXT() context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS h_thread = self.open_thread(thread_id) if kernel32.GetThreadContext(h_thread, byref(context)): kernel32.CloseHandle(h_thread) return context else: print "get_thread_context fail." return Falsemy_test
# -*- coding: utf-8 -*-
# @Date : 2016-08-12 14:18:10
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
import my_debugger
debugger = my_debugger.debugger()
# debugger.load("C:\\WINDOWS\\system32\\calc.exe")
pid = raw_input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
threadList = debugger.enumerate_threads()
print threadList
for thread in threadList:
thread_context = debugger.get_thread_context(thread)
# %08x就是8位的十六进制,不够就0补充咯
print "[*] Dumping registers for thread ID:0x%08x" % thread
print "[**] EIP:0x%08x" % thread_context.Eip
print "[**] ESP:0x%08x" % thread_context.Esp
print "[**] EBP:0x%08x" % thread_context.Ebp
print "[**] EAX:0x%08x" % thread_context.Eax
print "[**] EBX:0x%08x" % thread_context.Ebx
print "[**] ECX:0x%08x" % thread_context.Ecx
print "[**] EDX:0x%08x" % thread_context.Edx
print "[*] END DUMP"
debugger.detach()# raw_input("Press a key to continue...")
# self.debugger_active = False还要把run注释掉# 调试事件常量 EXCEPTION_DEBUG_EVENT = 0x1 CREATE_THREAD_DEBUG_EVENT = 0x2 CREATE_PROCESS_DEBUG_EVENT = 0x3 EXIT_THREAD_DEBUG_EVENT = 0x4 EXIT_PROCESS_DEBUG_EVENT = 0x5 LOAD_DLL_DEBUG_EVENT = 0x6 UNLOAD_DLL_DEBUG_EVENT = 0x7 OUTPUT_DEBUG_STRING_EVENT = 0x8 RIP_EVENT = 0x9
def read_process_memory(self, address, length): data = "" read_buf = create_string_buffer(length) count = c_ulong(0) if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, byref(count)): return False else: data += read_buf.raw return data def write_process_memory(self, address, data): count = c_ulong(0) length = len(data) c_data = c_char_p(data[count.value:]) if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, byref(count)): return False else: return True # 设置断点 def bp_set(self, address): # 看看断点的字典里是不是已经存在这个断点的地址了 if not self.breakpoints.has_key(address): try: # 先读取原来的一个字节,保存后再写入0xCC original_byte = self.read_process_memory(address, 1) self.write_process_memory(address, '\xCC') self.breakpoints[address] = (address, original_byte) except: return False return True # 获取某个模块(一般是dll)中的某个函数的地址 def func_resolve(self, dll, function): handle = kernel32.GetModuleHandleA(dll) address = kernel32.GetProcAddress(handle, function) kernel32.CloseHandle(handle) return address
# -*- coding: utf-8 -*-
# @Date : 2016-08-14 10:04:29
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
from pydbg import *
from pydbg.defines import *
import struct
import random
# 这是我们定义的回调函数
def printf_randomizer(dbg):
# 用esp索引count局部变量的值
parameter_addr = dbg.context.Esp + 0x8
counter = dbg.read_process_memory(parameter_addr, 4)
print repr(counter)
# L表示unsigned long的意思
# print struct.unpack("L", counter)
counter = struct.unpack("L", counter)[0]
print "Counter:%d" % int(counter)
# 生成1到100的随机数,再转换成二进制格式的
random_counter = random.randint(1, 100)
random_counter = struct.pack("L", random_counter)[0]
print repr(random_counter)
dbg.write_process_memory(parameter_addr, random_counter)
print GetLastError()
return DBG_CONTINUE
dbg = pydbg()
pid = raw_input("Please Enter the printf_loop.py PID:")
# 附加
dbg.attach(int(pid))
printf_address = dbg.func_resolve("msvcrt", "printf")
# description为断点设置名字,handler设置回调函数
dbg.bp_set(printf_address, description="printf_address", handler=printf_randomizer)
# 启动起来
dbg.run()# -*- coding: utf-8 -*-
# @Date : 2016-08-14 20:53:53
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
from ctypes import *
msvcrt = cdll.msvcrt
raw_input("Once the debugger is attached, press any key.")
# 定义一个缓冲区
buffer = c_char_p("AAAAA")
# 用于溢出的字符串
overflow = 'A' * 100
# 溢出
msvcrt.strcpy(buffer, overflow)# -*- coding: utf-8 -*-
# @Date : 2016-08-14 20:57:34
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
from pydbg import *
from pydbg.defines import *
import utils
def check_accessv(dbg):
if dbg.dbg.u.Exception.dwFirstChance:
return DBG_EXCEPTION_NOT_HANDLED
crash_bin = utils.crash_binning.crash_binning()
crash_bin.record_crash(dbg)
dbg.terminate_process()
return DBG_EXCEPTION_NOT_HANDLED
pid = raw_input("Enter the Process ID:")
dbg = pydbg()
dbg.attach(int(pid))
dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, check_accessv)
dbg.run()# -*- coding: utf-8 -*-
# @Date : 2016-08-14 22:31:08
# @Author : giantbranch (giantbranch@gmail.com)
# @Link : http://blog.csdn.net/u012763794?viewmode=contents
from pydbg import *
from pydbg.defines import *
import utils
# 设置我们要监视的代码的数量,就是内存访问违规后输出多少代码(指令)
MAX_INSTRUCTIONS = 10
# 一些危险的函数
dangerous_functions = {
"strcpy":"msvcrt.dll",
"strncpy":"msvcrt.dll",
"sprintf":"msvcrt.dll",
"vsprintf":"msvcrt.dll"
}
dangerous_functions_resolved = {}
crash_encountered = False
instruction_count = 0
def danger_handler(dbg):
esp_offset = 0
print "[*] Hit %s" % dangerous_functions_resolved[dbg.context.Eip]
print "================================================================================"
while esp_offset<=20:
parameter = dbg.smart_dereference(dbg.context.Esp + esp_offset)
print "[ESP + %d] => %s" % (esp_offset, parameter)
esp_offset += 4
print "================================================================================"
dbg.suspend_all_threads()
dbg.process_snapshot()
dbg.resume_all_threads()
return DBG_CONTINUE
def access_violation_handler(dbg):
global crash_encountered
if dbg.dbg.u.Exception.dwFirstChance:
return DBG_EXCEPTION_NOT_HANDLED
crash_bin = utils.crash_binning.crash_binning()
crash_bin.record_crash(dbg)
print crash_bin.crash_synopsis()
if crash_encountered == False:
dbg.suspend_all_threads()
dbg.process_restore()
crash_encountered = True
for thread_id in dbg.enumerate_threads():
print "[*] Setting single step for thread:0x%08x" % thread_id
h_thread = dbg.open_thread(thread_id)
dbg.single_step(True, h_thread)
dbg.close_handle(h_thread)
dbg.resume_all_threads()
return DBG_CONTINUE
else:
dbg.terminate_process()
return DBG_EXCEPTION_NOT_HANDLED
def single_step_handler(dbg):
global instruction_count
global crash_encountered
if crash_encountered:
if instruction_count == MAX_INSTRUCTIONS:
dbg.single_step(False)
return DBG_CONTINUE
else:
instruction = dbg.disasm(dbg.context.Eip)
print "#%d\t0x%08x : %s" % (instruction_count, dbg.context.Eip, instruction)
instruction_count += 1
dbg.single_step(True)
return DBG_CONTINUE
dbg = pydbg()
pid = int(raw_input("Enter the PID you wish to monitor:"))
dbg.attach(pid)
for func in dangerous_functions.keys():
func_address = dbg.func_resolve(dangerous_functions[func], func)
print "[*] Resolved breakpoint:%s -> 0x%08x" % (func, func_address)
dbg.bp_set(func_address, handler=danger_handler)
dangerous_functions_resolved[func_address] = func
dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, access_violation_handler)
dbg.set_callback(EXCEPTION_SINGLE_STEP, single_step_handler)
dbg.run()
from immlib import * def main(args): imm = Debugger() return "[*] PyCommand Executed!"有两个必备条件: