Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(r2): backtrace and control flow #1213

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
1 change: 1 addition & 0 deletions examples/extensions/r2/hello_r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def my_sandbox(path, rootfs):
ql.hook_address(func, r2.functions['main'].offset)
# enable trace powered by r2 symsmap
# r2.enable_trace()
r2.set_backtrace(0x401906)
ql.run()

if __name__ == "__main__":
Expand Down
88 changes: 88 additions & 0 deletions examples/extensions/r2/mem_r2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys
from types import FrameType
sys.path.append('..')

from tests.test_elf import ELFTest
from qiling import Qiling
from qiling.const import QL_VERBOSE
from qiling.extensions.r2 import R2

def test_elf_linux_arm():
def my_puts(ql: Qiling):
params = ql.os.resolve_fcall_params(ELFTest.PARAMS_PUTS)
print(f'puts("{params["s"]}")')
# all_mem = ql.mem.save()
# for lbound, ubound, perm, _, _, _data in ql.mem.map_info:
# print(f"{lbound:#x} - {ubound:#x} {ubound - lbound:#x} {len(_data):#x} {perm:#x}")
# print()
# ql.mem.restore(all_mem)

ql = Qiling(["../examples/rootfs/arm_linux/bin/arm_stat64"], "../examples/rootfs/arm_linux", verbose=QL_VERBOSE.DEBUG)
ql.os.set_api('puts', my_puts)
ql.run()
del ql

def fn(frame: FrameType, msg, arg):
if msg == 'return':
print("Return: ", arg)
return
if msg != 'call': return
# Filter as appropriate
if 'memory' not in frame.f_code.co_filename: return
if '<' in frame.f_code.co_name: return
caller = frame.f_back.f_code.co_name
print("Called ", frame.f_code.co_name, "from ", caller)
for i in range(frame.f_code.co_argcount):
name = frame.f_code.co_varnames[i]
var = frame.f_locals[name]
if isinstance(var, (bytes, bytearray)):
var = f'{type(var)} len {len(var)}'
print(" Argument", name, "is", var)

sys.settrace(fn)

def unmap_hook(ql: "Qiling", access: int, addr: int, size: int, value: int):
print(f"Unmapped memory access at {addr:#x} - {addr + size:#x} with {value:#x} in type {access}")

def mem_cmp_hook(ql: "Qiling", addr: int, size: int):
mapinfo = ql.mem.map_info
for i, mem_region in enumerate(ql.uc.mem_regions()):
assert (mapinfo[i][0], mapinfo[i][1] - 1, mapinfo[i][2]) == mem_region
uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1)
data = ql.mem.map_info[i][5]
if uc_mem == data: continue
print(f"Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info from {addr:#x}")
for line in ql.mem.get_formatted_mapinfo():
print(line)
with open("mem.bin", "wb") as f:
f.write(uc_mem)
with open("map.bin", "wb") as f:
f.write(data)
assert False

def addr_hook(ql: "Qiling"):
mapinfo = ql.mem.map_info
for i, mem_region in enumerate(ql.uc.mem_regions()):
if i != 8: continue
uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1)
with open('right.bin', 'wb') as f:
f.write(uc_mem)

if __name__ == '__main__':
# from tests.test_shellcode import X8664_LIN
env = {'LD_DEBUG': 'all'}
# ql = Qiling(rootfs="rootfs/x8664_linux", code=X8664_LIN, archtype="x8664", ostype="linux", verbose=QL_VERBOSE.DEBUG)
# ql = Qiling(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows")
# ql = Qiling(["rootfs/arm_linux/bin/arm_hello_static"], "rootfs/arm_linux", verbose=QL_VERBOSE.DISASM)
# ql = Qiling(["rootfs/arm_linux/bin/arm_hello"], "rootfs/arm_linux", env=env, verbose=QL_VERBOSE.DEBUG)
ql = Qiling(["rootfs/x86_linux/bin/x86_hello"], "rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG)
# ql.hook_mem_unmapped(unmap_hook)
# ql.hook_code(mem_cmp_hook)
# mprot_addr = 0x047d4824
# ql.hook_address(addr_hook, mprot_addr)
# ql.debugger = 'qdb'
# ql = Qiling(["rootfs/x8664_linux/bin/testcwd"], "rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG)
for line in ql.mem.get_formatted_mapinfo():
print(line)
ql.run()
# test_elf_linux_arm()
2 changes: 1 addition & 1 deletion qiling/arch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, ql: Qiling):

@lru_cache(maxsize=64)
def get_base_and_name(self, addr: int) -> Tuple[int, str]:
for begin, end, _, name, _ in self.ql.mem.map_info:
for begin, end, _, name, _, _ in self.ql.mem.map_info:
if begin <= addr < end:
return begin, basename(name)

Expand Down
75 changes: 75 additions & 0 deletions qiling/extensions/r2/callstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from dataclasses import dataclass
from typing import Iterator, Optional


@dataclass
class CallStack:
"""Linked Frames
See https://github.com/angr/angr/blob/master/angr/state_plugins/callstack.py
"""
addr: int
sp: int
bp: int
name: str = None # 'name + offset'
next: Optional['CallStack'] = None

def __iter__(self) -> Iterator['CallStack']:
"""
Iterate through the callstack, from top to bottom
(most recent first).
"""
i = self
while i is not None:
yield i
i = i.next

def __getitem__(self, k):
"""
Returns the CallStack at index k, indexing from the top of the stack.
"""
orig_k = k
for i in self:
if k == 0:
return i
k -= 1
raise IndexError(orig_k)

def __len__(self):
"""
Get how many frames there are in the current call stack.

:return: Number of frames
:rtype: int
"""

o = 0
for _ in self:
o += 1
return o

def __repr__(self):
"""
Get a string representation.

:return: A printable representation of the CallStack object
:rtype: str
"""
return "<CallStack (depth %d)>" % len(self)

def __str__(self):
return "Backtrace:\n" + "\n".join(f"Frame {i}: [{f.name}] {f.addr:#x} sp={f.sp:#x}, bp={f.bp:#x}" for i, f in enumerate(self))

def __eq__(self, other):
if not isinstance(other, CallStack):
return False

if self.addr != other.addr or self.sp != other.sp or self.bp != other.bp:
return False

return self.next == other.next

def __ne__(self, other):
return not (self == other)

def __hash__(self):
return hash(tuple((c.addr, c.sp, c.bp) for c in self))
81 changes: 63 additions & 18 deletions qiling/extensions/r2/r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from qiling.const import QL_ARCH
from qiling.extensions import trace
from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL
from .callstack import CallStack

if TYPE_CHECKING:
from qiling.core import Qiling
Expand Down Expand Up @@ -141,10 +142,8 @@ def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0):
self.loadaddr = loadaddr # r2 -m [addr] map file at given address
self.analyzed = False
self._r2c = libr.r_core.r_core_new()
if ql.code:
self._setup_code(ql.code)
else:
self._setup_file(ql.path)
self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t))
self._setup_mem(ql)

def _qlarch2r(self, archtype: QL_ARCH) -> str:
return {
Expand All @@ -161,20 +160,23 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str:
QL_ARCH.PPC: "ppc",
}[archtype]

def _setup_code(self, code: bytes):
path = f'malloc://{len(code)}'.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)
self._cmd(f'wx {code.hex()}')
def _rbuf_map(self, cbuf: ctypes.Array[ctypes.c_ubyte], perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0):
rbuf = libr.r_buf_new_with_pointers(cbuf, len(cbuf), False) # last arg `steal` = False
rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t))
desc = libr.r_io_open_buffer(self._r2i, rbuf, perm, 0) # last arg `mode` is always 0 in r2 code
libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(cbuf))

def _setup_mem(self, ql: 'Qiling'):
if not hasattr(ql, '_mem'):
return
for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info:
cbuf = ql.mem.cmap[start]
self._rbuf_map(cbuf, perms, start)
# set architecture and bits for r2 asm
arch = self._qlarch2r(self.ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}")

def _setup_file(self, path: str):
path = path.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)

arch = self._qlarch2r(ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}")
self._cmd("oba") # load bininfo and update flags

def _cmd(self, cmd: str) -> str:
r = libr.r_core.r_core_cmd_str(
self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8")))
Expand Down Expand Up @@ -268,6 +270,40 @@ def dis_nbytes(self, addr: int, size: int) -> List[Instruction]:
insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")]
return insts

def dis_ninsts(self, addr: int, n: int=1) -> List[Instruction]:
insts = [Instruction(**dic) for dic in self._cmdj(f"pdj {n} @ {addr}")]
return insts

def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallStack]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More arch support

'''Fuzzy backtrace, see https://github.com/radareorg/radare2/blob/master/libr/debug/p/native/bt/fuzzy_all.c#L38
Args:
at: address to start walking stack, default to current SP
depth: limit of stack walking
Returns:
List of Frame
'''
sp = at or self.ql.arch.regs.arch_sp
wordsize = self.ql.arch.bits // 8
frame = None
cursp = oldsp = sp
for i in range(depth):
addr = self.ql.stack_read(i * wordsize)
inst = self.dis_ninsts(addr)[0]
if inst.type.lower() == 'call':
newframe = CallStack(addr=addr, sp=cursp, bp=oldsp, name=self.at(addr), next=frame)
frame = newframe
oldsp = cursp
cursp += wordsize
return frame

def set_backtrace(self, target: Union[int, str]):
'''Set backtrace at target address before executing'''
if isinstance(target, str):
target = self.where(target)
def bt_hook(__ql: "Qiling", *args):
print(self._backtrace_fuzzy())
self.ql.hook_address(bt_hook, target)

def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int:
'''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code
:param ql: Qiling instance
Expand All @@ -279,7 +315,7 @@ def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=No
anibbles = ql.arch.bits // 4
progress = 0
for inst in self.dis_nbytes(addr, size):
if inst.type.lower() == 'invalid':
if inst.type.lower() in ('invalid', 'ill'):
break # stop disasm
name, offset = self.at(inst.offset, parse=True)
if filt is None or filt.search(name):
Expand All @@ -301,5 +337,14 @@ def enable_trace(self, mode='full'):
elif mode == 'history':
trace.enable_history_trace(self.ql)

def shell(self):
while True:
offset = self._r2c.contents.offset
print(f"[{offset:#x}]> ", end="")
cmd = input()
if cmd.strip() == "q":
break
print(self._cmd(cmd))

def __del__(self):
libr.r_core.r_core_free(self._r2c)
Loading