From bdda8e8c107007b6f6cabe06429baf5f6808f48c Mon Sep 17 00:00:00 2001 From: elicn Date: Wed, 13 Mar 2024 11:10:57 +0200 Subject: [PATCH] Add a utility class to simplify custom AFL fuzzing --- .../linux_x8664/qlfuzzer_x8664_linux.py | 87 ++++++++++ qiling/extensions/afl/qlfuzzer.py | 157 ++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py create mode 100644 qiling/extensions/afl/qlfuzzer.py diff --git a/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py b/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py new file mode 100644 index 000000000..9b23f20bf --- /dev/null +++ b/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +"""Simple example of how to use QlFuzzer to easily create a custom fuzzer that +leverages Qiling and AFLplusplus. + +Note: this example refers to linux_x8664/fuzz_x8664_linux.py + +Steps: + o Clone and build AFL++ + $ git clone https://github.com/AFLplusplus/AFLplusplus.git + $ make -C AFLplusplus + + o Build Unicorn support + $ ( cd AFLplusplus/unicorn_mode ; ./build_unicorn_support.sh ) + + o Start fuzzing + $ AFL_AUTORESUME=1 AFL_PATH="$(realpath ./AFLplusplus)" PATH="$AFL_PATH:$PATH" afl-fuzz -i afl_inputs -o afl_outputs -U -- python3 ./qlfuzzer_x8664_linux.py @@ + + o Cleanup results + $ rm -fr afl_outputs/default/ +""" + +from __future__ import annotations + +import os +import sys + +from typing import TYPE_CHECKING, Collection, Optional, Sequence + +# replace this if qiling is located elsewhere +QLHOME = os.path.realpath(r'../../..') + +sys.path.append(QLHOME) +from qiling.extensions import pipe +from qiling.extensions.afl.qlfuzzer import QlFuzzer + + +if TYPE_CHECKING: + from qiling import Qiling + + +class MyFuzzer(QlFuzzer): + """Custom fuzzer. + """ + + def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None: + super().setup(infilename, entry, exits, crashes) + + # redirect stdin to our mock to feed it with incoming fuzzed keystrokes + self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno()) + + def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool: + # feed fuzzed input as-is to our mock stdin + ql.os.stdin.write(stimuli) + + # signal afl to proceed with this input + return True + + +def main(argv: Sequence[str], rootfs: str, infilename: str): + # initialize our custom fuzzer + fuzzer = MyFuzzer(argv, rootfs) + + # calculate fuzzing scope effective addresses + main_begins = fuzzer.ea(0x1275) + main_ends = fuzzer.ea(0x1293) + + # make the process crash whenever __stack_chk_fail@plt is about to be called. + # this way afl will count stack protection violations as fuzzing crashes + stack_chk_fail = fuzzer.ea(0x126e) + + # set up fuzzing parameters + fuzzer.setup(infilename, main_begins, [main_ends], [stack_chk_fail]) + + # start fuzzing. + # + # note that although the main function is being fuzzed, we start emulating the program from its + # default starting point to make sure 'main' has all the necessary data initialized and ready. + fuzzer.run() + + +if __name__ == '__main__': + main( + rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(), + rf'{QLHOME}/examples/rootfs/x8664_linux', + rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a' + ) diff --git a/qiling/extensions/afl/qlfuzzer.py b/qiling/extensions/afl/qlfuzzer.py new file mode 100644 index 000000000..c7f7c588c --- /dev/null +++ b/qiling/extensions/afl/qlfuzzer.py @@ -0,0 +1,157 @@ +# @author: elicn + +import os + +from abc import ABC, abstractmethod +from typing import Any, Collection, Dict, Optional, Sequence + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions import afl +from qiling.os.const import POINTER + + +class QlFuzzer(ABC): + """Simplify Qiling-based fuzzing. + + Subclass it to easily implement a custom Qiling-based fuzzer. + """ + + @staticmethod + def __set_default(params: Dict[str, Any], name: str, value: Any) -> None: + """Set a default value to an option whose value was not specified. + + Args: + params: kwargs dictionary to modify + name: option name + value: default value to set + + Returns: None. however `params` dictionary is modified + """ + + if name not in params: + params[name] = value + + def __init__(self, argv: Sequence[str], rootfs: str, **kwargs) -> None: + """Initialize fuzzer instance. + + Parameters are identical to Qiling init. + """ + + # unless explicitly set otherwise, tune qiling for maximum performance + self.__set_default(kwargs, 'verbose', QL_VERBOSE.DISABLED) + self.__set_default(kwargs, 'log_devices', []) + self.__set_default(kwargs, 'console', False) + + self.ql = Qiling(argv, rootfs, **kwargs) + + def __install_crash_hooks(self, crashes: Collection[int]) -> None: + """Hook certain locations in code and make them simulate a crash so AFL would recognize + them as meaningful targets. + + Args: + crashes: executable addresses to hook + """ + + def __crash(ql: Qiling) -> None: + os.abort() + + for address in crashes: + self.ql.hook_address(__crash, address) + + def __install_kickoff_hook(self, infilename: str, entry: int, exits: Collection[int]) -> None: + def __kickoff(ql: Qiling): + """Have Unicorn forked and start instrumentation. + """ + + # this is just a one-time hook; remove it + ko_hook.remove() + + afl.ql_afl_fuzz(ql, infilename, self.feed_input, exits) + + # set afl instrumentation [re]starting point + ko_hook = self.ql.hook_address(__kickoff, entry) + + def stage_call_site(self, params: Sequence[int]) -> None: + """Stage parameters for a function call. + This method provides a convinient way to set up parameters when fuzzing a function call. + + Args: + params: a sequence of integer values to set as parameters + """ + + self.ql.os.fcall.writeParams([(POINTER, p) for p in params]) + + @abstractmethod + def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool: + """A callback method invoked by AFL whenever a new fuzzing stimuli is generated. + The method may manipulate the stimuli to its needs or use it as-is, and ultimately + responsible to place it where the fuzzed program expects its input to be found, e.g.: + stdin, file, socket, memory, etc. + + Args: + ql: qiling instance + stimuli: newly generated input to the fuzzed program + pround: iteration number within a persistent session. if persistency was not set, + round value is expected to be 0 every time + + Returns: a boolean indicator of whether AFL should proceed with this fuzzing iteration + or not (i.e. in case the generated stimuli does not satisfy fuzzing logic criteria) + """ + + def ea(self, offset: int, module: Optional[str] = None, *, casefold: bool = False) -> int: + """Get the effective address of a file offset. + + Args: + offset: file offset + module: module basename (the emulated binary, by default) + casefold: match module name case-insensitively. this becomes useful when windows + binaries load their libraries using arbitrary case names + + Returns: the effective address of `offset` using `module` base address. + Raises: `KeyError` if the requested module was not loaded + """ + + image = self.ql.loader.get_image_by_name(module or os.path.basename(self.ql.argv[0]), casefold=casefold) + + if image is None: + raise KeyError(f'could not find a loaded module named "{module}"') + + return image.base + offset + + def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None: + """Set up the fuzzing parameters. + + Args: + infilename: path of a file that contains an initial fuzzing input which does not crash + entry: fuzzing entry point. this is where AFL will keep resetting to on each iteration + exits: fuzzing exit points. reaching either one of these addresses means the fuzzing + iteration has ended gracefully and AFL should start a new one + crashes: simulate a crash on these addresses to make AFL mark it as a successfull case. + this is useful to mark "fuzzing points of interest" that would be otherwise overlooked + by AFL since they do not crash the program + + Notes: + - starting a fuzzing session without calling this method first will result in a dry-run + """ + + # set up hooks to simulate crashes + if crashes is not None: + self.__install_crash_hooks(crashes) + + # hook the fuzzing entry address to kick-off AFL + self.__install_kickoff_hook(infilename, entry, exits) + + def run(self, begin: Optional[int] = None) -> None: + """Start the fuzzing session. + + Args: + begin: emulation starting point. this may or may not be the same as the fuzzing entry + point, depending on whether the fuzzed code reply on global resources or prior + initialization. For example, fuzzing a 'main' function would require prior code to + initialize argc and argv, as opposed to a stand-alone (pure) function that only needs + its arguments and does not need any prior initialization to happen first. + If not set, emulation will start from the default starting point. + """ + + self.ql.run(begin)