Skip to content

Commit

Permalink
refactor!: ql own mem as bytearray, ref passed to uc and r2
Browse files Browse the repository at this point in the history
BREAKING CHANGE: MapInfoEntry now has 6 elements instead of 5
BREAKING CHANGE: mem is managed in Python instead of uc
BREAKING CHANGE: r2 map io from ql.mem, no full binary, now missing symbols
  • Loading branch information
chinggg committed Sep 2, 2022
1 parent 023c3a3 commit d003c2e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 43 deletions.
2 changes: 1 addition & 1 deletion examples/extensions/r2/hello_r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def my_sandbox(path, rootfs):
ql.hook_address(func, r2.functions['main'].offset)
# enable trace powered by r2 symsmap
# r2.enable_trace()
r2.bt(0x401906)
r2.set_backtrace(0x401906)
ql.run()

if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion qiling/arch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, ql: Qiling):

@lru_cache(maxsize=64)
def get_base_and_name(self, addr: int) -> Tuple[int, str]:
for begin, end, _, name, _ in self.ql.mem.map_info:
for begin, end, _, name, _, _ in self.ql.mem.map_info:
if begin <= addr < end:
return begin, basename(name)

Expand Down
33 changes: 16 additions & 17 deletions qiling/extensions/r2/r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0):
self.loadaddr = loadaddr # r2 -m [addr] map file at given address
self.analyzed = False
self._r2c = libr.r_core.r_core_new()
if ql.code:
self._setup_code(ql.code)
else:
self._setup_file(ql.path)
self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t))
self._setup_mem(ql)

def _qlarch2r(self, archtype: QL_ARCH) -> str:
return {
Expand All @@ -162,20 +160,21 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str:
QL_ARCH.PPC: "ppc",
}[archtype]

def _setup_code(self, code: bytes):
path = f'malloc://{len(code)}'.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)
self._cmd(f'wx {code.hex()}')
def _rbuf_map(self, buf: bytearray, perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0):
rbuf = libr.r_buf_new_with_pointers(ctypes.c_ubyte.from_buffer(buf), len(buf), False)
rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t))
desc = libr.r_io_open_buffer(self._r2i, rbuf, perm, 0) # last arg `mode` is always 0 in r2 code
libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(buf))

def _setup_mem(self, ql: 'Qiling'):
if not hasattr(ql, '_mem'):
return
for start, end, perms, _label, _mmio, buf in ql.mem.map_info:
self._rbuf_map(buf, perms, start)
# set architecture and bits for r2 asm
arch = self._qlarch2r(self.ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}")

def _setup_file(self, path: str):
path = path.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)

arch = self._qlarch2r(ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}")

def _cmd(self, cmd: str) -> str:
r = libr.r_core.r_core_cmd_str(
self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8")))
Expand Down
49 changes: 25 additions & 24 deletions qiling/os/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Cross Platform and Multi Architecture Advanced Binary Emulation Framework
#

import ctypes
import os, re
from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union

Expand All @@ -11,8 +12,8 @@
from qiling import Qiling
from qiling.exception import *

# tuple: range start, range end, permissions mask, range label, is mmio?
MapInfoEntry = Tuple[int, int, int, str, bool]
# tuple: range start, range end, permissions mask, range label, is mmio?, bytearray
MapInfoEntry = Tuple[int, int, int, str, bool, bytearray]

MmioReadCallback = Callable[[Qiling, int, int], int]
MmioWriteCallback = Callable[[Qiling, int, int, int], None]
Expand Down Expand Up @@ -80,7 +81,7 @@ def string(self, addr: int, value=None, encoding='utf-8') -> Optional[str]:

self.__write_string(addr, value, encoding)

def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False):
def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None):
"""Add a new memory range to map.
Args:
Expand All @@ -90,9 +91,8 @@ def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio
mem_info: map entry label
is_mmio: memory range is mmio
"""

self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio))
self.map_info = sorted(self.map_info, key=lambda tp: tp[0])
self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data))
self.map_info.sort(key=lambda tp: tp[0])

def del_mapinfo(self, mem_s: int, mem_e: int):
"""Subtract a memory range from map.
Expand All @@ -104,30 +104,30 @@ def del_mapinfo(self, mem_s: int, mem_e: int):

tmp_map_info: MutableSequence[MapInfoEntry] = []

for s, e, p, info, mmio in self.map_info:
for s, e, p, info, mmio, data in self.map_info:
if e <= mem_s:
tmp_map_info.append((s, e, p, info, mmio))
tmp_map_info.append((s, e, p, info, mmio, data))
continue

if s >= mem_e:
tmp_map_info.append((s, e, p, info, mmio))
tmp_map_info.append((s, e, p, info, mmio, data))
continue

if s < mem_s:
tmp_map_info.append((s, mem_s, p, info, mmio))
tmp_map_info.append((s, mem_s, p, info, mmio, data))

if s == mem_s:
pass

if e > mem_e:
tmp_map_info.append((mem_e, e, p, info, mmio))
tmp_map_info.append((mem_e, e, p, info, mmio, data))

if e == mem_e:
pass

self.map_info = tmp_map_info

def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None):
def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None):
tmp_map_info: Optional[MapInfoEntry] = None
info_idx: int = None

Expand All @@ -147,7 +147,7 @@ def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, me
return

if mem_info is not None:
self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4])
self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5])

def get_mapinfo(self) -> Sequence[Tuple[int, int, str, str, str]]:
"""Get memory map info.
Expand All @@ -166,7 +166,7 @@ def __perms_mapping(ps: int) -> str:

return ''.join(val if idx & ps else '-' for idx, val in perms_d.items())

def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, str]:
def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool, _data: bytearray) -> Tuple[int, int, str, str, str]:
perms_str = __perms_mapping(perms)

if hasattr(self.ql, 'loader'):
Expand Down Expand Up @@ -211,7 +211,7 @@ def get_lib_base(self, filename: str) -> Optional[int]:

# some info labels may be prefixed by boxed label which breaks the search by basename.
# iterate through all info labels and remove all boxed prefixes, if any
stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _ in self.map_info)
stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _, _ in self.map_info)

return next((lbound for lbound, info in stripped if os.path.basename(info) == filename), None)

Expand Down Expand Up @@ -268,11 +268,10 @@ def save(self):
"mmio" : []
}

for lbound, ubound, perm, label, is_mmio in self.map_info:
for lbound, ubound, perm, label, is_mmio, data in self.map_info:
if is_mmio:
mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)]))
else:
data = self.read(lbound, ubound - lbound)
mem_dict['ram'].append((lbound, ubound, perm, label, bytes(data)))

return mem_dict
Expand Down Expand Up @@ -393,7 +392,7 @@ def search(self, needle: Union[bytes, Pattern[bytes]], begin: Optional[int] = No
assert begin < end, 'search arguments do not make sense'

# narrow the search down to relevant ranges; mmio ranges are excluded due to potential read size effects
ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio in self.map_info if not (end < lbound or ubound < begin or is_mmio)]
ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio, _data in self.map_info if not (end < lbound or ubound < begin or is_mmio)]
results = []

# if needle is a bytes sequence use it verbatim, not as a pattern
Expand Down Expand Up @@ -439,10 +438,10 @@ def __mapped_regions(self) -> Iterator[Tuple[int, int]]:

iter_memmap = iter(self.map_info)

p_lbound, p_ubound, _, _, _ = next(iter_memmap)
p_lbound, p_ubound, _, _, _, _ = next(iter_memmap)

# map_info is assumed to contain non-overlapping regions sorted by lbound
for lbound, ubound, _, _, _ in iter_memmap:
for lbound, ubound, _, _, _, _ in iter_memmap:
if lbound == p_ubound:
p_ubound = ubound
else:
Expand Down Expand Up @@ -514,8 +513,8 @@ def find_free_space(self, size: int, minaddr: Optional[int] = None, maxaddr: Opt
assert minaddr < maxaddr

# get gap ranges between mapped ones and memory bounds
gaps_ubounds = tuple(lbound for lbound, _, _, _, _ in self.map_info) + (mem_ubound,)
gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _ in self.map_info)
gaps_ubounds = tuple(lbound for lbound, _, _, _, _, _ in self.map_info) + (mem_ubound,)
gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _, _ in self.map_info)
gaps = zip(gaps_lbounds, gaps_ubounds)

for lbound, ubound in gaps:
Expand Down Expand Up @@ -582,8 +581,10 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str
if not self.is_available(addr, size):
raise QlMemoryMappedError('Requested memory is unavailable')

self.ql.uc.mem_map(addr, size, perms)
self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False)
buf = bytearray(size)
buf_type = ctypes.c_byte * size
self.ql.uc.mem_map_ptr(addr, size, perms, buf_type.from_buffer(buf))
self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf)

def map_mmio(self, addr: int, size: int, read_cb: Optional[MmioReadCallback], write_cb: Optional[MmioWriteCallback], info: str = '[mmio]'):
# TODO: mmio memory overlap with ram? Is that possible?
Expand Down

0 comments on commit d003c2e

Please sign in to comment.