Skip to content

Commit

Permalink
Add a utility class to simplify custom AFL fuzzing
Browse files Browse the repository at this point in the history
  • Loading branch information
elicn committed Mar 25, 2024
1 parent 287cc9f commit bdda8e8
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
87 changes: 87 additions & 0 deletions examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python3

"""Simple example of how to use QlFuzzer to easily create a custom fuzzer that
leverages Qiling and AFLplusplus.
Note: this example refers to linux_x8664/fuzz_x8664_linux.py
Steps:
o Clone and build AFL++
$ git clone https://github.com/AFLplusplus/AFLplusplus.git
$ make -C AFLplusplus
o Build Unicorn support
$ ( cd AFLplusplus/unicorn_mode ; ./build_unicorn_support.sh )
o Start fuzzing
$ AFL_AUTORESUME=1 AFL_PATH="$(realpath ./AFLplusplus)" PATH="$AFL_PATH:$PATH" afl-fuzz -i afl_inputs -o afl_outputs -U -- python3 ./qlfuzzer_x8664_linux.py @@
o Cleanup results
$ rm -fr afl_outputs/default/
"""

from __future__ import annotations

import os
import sys

from typing import TYPE_CHECKING, Collection, Optional, Sequence

# replace this if qiling is located elsewhere
QLHOME = os.path.realpath(r'../../..')

sys.path.append(QLHOME)
from qiling.extensions import pipe
from qiling.extensions.afl.qlfuzzer import QlFuzzer


if TYPE_CHECKING:
from qiling import Qiling


class MyFuzzer(QlFuzzer):
"""Custom fuzzer.
"""

def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None:
super().setup(infilename, entry, exits, crashes)

# redirect stdin to our mock to feed it with incoming fuzzed keystrokes
self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno())

def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool:
# feed fuzzed input as-is to our mock stdin
ql.os.stdin.write(stimuli)

# signal afl to proceed with this input
return True


def main(argv: Sequence[str], rootfs: str, infilename: str):
# initialize our custom fuzzer
fuzzer = MyFuzzer(argv, rootfs)

# calculate fuzzing scope effective addresses
main_begins = fuzzer.ea(0x1275)
main_ends = fuzzer.ea(0x1293)

# make the process crash whenever __stack_chk_fail@plt is about to be called.
# this way afl will count stack protection violations as fuzzing crashes
stack_chk_fail = fuzzer.ea(0x126e)

# set up fuzzing parameters
fuzzer.setup(infilename, main_begins, [main_ends], [stack_chk_fail])

# start fuzzing.
#
# note that although the main function is being fuzzed, we start emulating the program from its
# default starting point to make sure 'main' has all the necessary data initialized and ready.
fuzzer.run()


if __name__ == '__main__':
main(
rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(),
rf'{QLHOME}/examples/rootfs/x8664_linux',
rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a'
)
157 changes: 157 additions & 0 deletions qiling/extensions/afl/qlfuzzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# @author: elicn

import os

from abc import ABC, abstractmethod
from typing import Any, Collection, Dict, Optional, Sequence

from qiling import Qiling
from qiling.const import QL_VERBOSE
from qiling.extensions import afl
from qiling.os.const import POINTER


class QlFuzzer(ABC):
"""Simplify Qiling-based fuzzing.
Subclass it to easily implement a custom Qiling-based fuzzer.
"""

@staticmethod
def __set_default(params: Dict[str, Any], name: str, value: Any) -> None:
"""Set a default value to an option whose value was not specified.
Args:
params: kwargs dictionary to modify
name: option name
value: default value to set
Returns: None. however `params` dictionary is modified
"""

if name not in params:
params[name] = value

def __init__(self, argv: Sequence[str], rootfs: str, **kwargs) -> None:
"""Initialize fuzzer instance.
Parameters are identical to Qiling init.
"""

# unless explicitly set otherwise, tune qiling for maximum performance
self.__set_default(kwargs, 'verbose', QL_VERBOSE.DISABLED)
self.__set_default(kwargs, 'log_devices', [])
self.__set_default(kwargs, 'console', False)

self.ql = Qiling(argv, rootfs, **kwargs)

def __install_crash_hooks(self, crashes: Collection[int]) -> None:
"""Hook certain locations in code and make them simulate a crash so AFL would recognize
them as meaningful targets.
Args:
crashes: executable addresses to hook
"""

def __crash(ql: Qiling) -> None:
os.abort()

for address in crashes:
self.ql.hook_address(__crash, address)

def __install_kickoff_hook(self, infilename: str, entry: int, exits: Collection[int]) -> None:
def __kickoff(ql: Qiling):
"""Have Unicorn forked and start instrumentation.
"""

# this is just a one-time hook; remove it
ko_hook.remove()

afl.ql_afl_fuzz(ql, infilename, self.feed_input, exits)

# set afl instrumentation [re]starting point
ko_hook = self.ql.hook_address(__kickoff, entry)

def stage_call_site(self, params: Sequence[int]) -> None:
"""Stage parameters for a function call.
This method provides a convinient way to set up parameters when fuzzing a function call.
Args:
params: a sequence of integer values to set as parameters
"""

self.ql.os.fcall.writeParams([(POINTER, p) for p in params])

@abstractmethod
def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool:
"""A callback method invoked by AFL whenever a new fuzzing stimuli is generated.
The method may manipulate the stimuli to its needs or use it as-is, and ultimately
responsible to place it where the fuzzed program expects its input to be found, e.g.:
stdin, file, socket, memory, etc.
Args:
ql: qiling instance
stimuli: newly generated input to the fuzzed program
pround: iteration number within a persistent session. if persistency was not set,
round value is expected to be 0 every time
Returns: a boolean indicator of whether AFL should proceed with this fuzzing iteration
or not (i.e. in case the generated stimuli does not satisfy fuzzing logic criteria)
"""

def ea(self, offset: int, module: Optional[str] = None, *, casefold: bool = False) -> int:
"""Get the effective address of a file offset.
Args:
offset: file offset
module: module basename (the emulated binary, by default)
casefold: match module name case-insensitively. this becomes useful when windows
binaries load their libraries using arbitrary case names
Returns: the effective address of `offset` using `module` base address.
Raises: `KeyError` if the requested module was not loaded
"""

image = self.ql.loader.get_image_by_name(module or os.path.basename(self.ql.argv[0]), casefold=casefold)

if image is None:
raise KeyError(f'could not find a loaded module named "{module}"')

return image.base + offset

def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None:
"""Set up the fuzzing parameters.
Args:
infilename: path of a file that contains an initial fuzzing input which does not crash
entry: fuzzing entry point. this is where AFL will keep resetting to on each iteration
exits: fuzzing exit points. reaching either one of these addresses means the fuzzing
iteration has ended gracefully and AFL should start a new one
crashes: simulate a crash on these addresses to make AFL mark it as a successfull case.
this is useful to mark "fuzzing points of interest" that would be otherwise overlooked
by AFL since they do not crash the program
Notes:
- starting a fuzzing session without calling this method first will result in a dry-run
"""

# set up hooks to simulate crashes
if crashes is not None:
self.__install_crash_hooks(crashes)

# hook the fuzzing entry address to kick-off AFL
self.__install_kickoff_hook(infilename, entry, exits)

def run(self, begin: Optional[int] = None) -> None:
"""Start the fuzzing session.
Args:
begin: emulation starting point. this may or may not be the same as the fuzzing entry
point, depending on whether the fuzzed code reply on global resources or prior
initialization. For example, fuzzing a 'main' function would require prior code to
initialize argc and argv, as opposed to a stand-alone (pure) function that only needs
its arguments and does not need any prior initialization to happen first.
If not set, emulation will start from the default starting point.
"""

self.ql.run(begin)

0 comments on commit bdda8e8

Please sign in to comment.