forked from mandiant/capa
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Daniel Plohmann (jupiter)
committed
Oct 29, 2020
1 parent
eef8f2e
commit 3682292
Showing
9 changed files
with
776 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import sys | ||
import types | ||
|
||
from smda.common.SmdaReport import SmdaReport | ||
from smda.common.SmdaInstruction import SmdaInstruction | ||
|
||
import capa.features.extractors.smda.file | ||
import capa.features.extractors.smda.insn | ||
import capa.features.extractors.smda.function | ||
import capa.features.extractors.smda.basicblock | ||
from capa.features.extractors import FeatureExtractor | ||
|
||
|
||
class SmdaFeatureExtractor(FeatureExtractor): | ||
def __init__(self, smda_report: SmdaReport, path): | ||
super(SmdaFeatureExtractor, self).__init__() | ||
self.smda_report = smda_report | ||
self.path = path | ||
|
||
def get_base_address(self): | ||
return self.smda_report.base_addr | ||
|
||
def extract_file_features(self): | ||
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path): | ||
yield feature, va | ||
|
||
def get_functions(self): | ||
for function in self.smda_report.getFunctions(): | ||
yield function | ||
|
||
def extract_function_features(self, f): | ||
for feature, va in capa.features.extractors.smda.function.extract_features(f): | ||
yield feature, va | ||
|
||
def get_basic_blocks(self, f): | ||
for bb in f.getBlocks(): | ||
yield bb | ||
|
||
def extract_basic_block_features(self, f, bb): | ||
for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb): | ||
yield feature, va | ||
|
||
def get_instructions(self, f, bb): | ||
for smda_ins in bb.getInstructions(): | ||
yield smda_ins | ||
|
||
def extract_insn_features(self, f, bb, insn): | ||
for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn): | ||
yield feature, va |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import sys | ||
import string | ||
import struct | ||
|
||
from capa.features import Characteristic | ||
from capa.features.basicblock import BasicBlock | ||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN | ||
|
||
|
||
def _bb_has_tight_loop(f, bb): | ||
""" | ||
parse tight loops, true if last instruction in basic block branches to bb start | ||
""" | ||
return bb.offset in f.blockrefs[bb.offset] if bb.offset in f.blockrefs else False | ||
|
||
|
||
def extract_bb_tight_loop(f, bb): | ||
""" check basic block for tight loop indicators """ | ||
if _bb_has_tight_loop(f, bb): | ||
yield Characteristic("tight loop"), bb.offset | ||
|
||
|
||
def _bb_has_stackstring(f, bb): | ||
""" | ||
extract potential stackstring creation, using the following heuristics: | ||
- basic block contains enough moves of constant bytes to the stack | ||
""" | ||
count = 0 | ||
for instr in bb.getInstructions(): | ||
if is_mov_imm_to_stack(instr): | ||
count += get_printable_len(instr.getDetailed()) | ||
if count > MIN_STACKSTRING_LEN: | ||
return True | ||
return False | ||
|
||
|
||
def get_operands(smda_ins): | ||
return [o.strip() for o in smda_ins.operands.split(",")] | ||
|
||
|
||
def extract_stackstring(f, bb): | ||
""" check basic block for stackstring indicators """ | ||
if _bb_has_stackstring(f, bb): | ||
yield Characteristic("stack string"), bb.offset | ||
|
||
|
||
def is_mov_imm_to_stack(smda_ins): | ||
""" | ||
Return if instruction moves immediate onto stack | ||
""" | ||
if not smda_ins.mnemonic.startswith("mov"): | ||
return False | ||
|
||
try: | ||
dst, src = get_operands(smda_ins) | ||
except ValueError: | ||
# not two operands | ||
return False | ||
|
||
try: | ||
int(src, 16) | ||
except ValueError: | ||
return False | ||
|
||
if not any(regname in dst for regname in ["ebp", "rbp", "esp", "rsp"]): | ||
return False | ||
|
||
return True | ||
|
||
|
||
def get_printable_len(instr): | ||
""" | ||
Return string length if all operand bytes are ascii or utf16-le printable | ||
Works on a capstone instruction | ||
""" | ||
# should have exactly two operands for mov immediate | ||
if len(instr.operands) != 2: | ||
return 0 | ||
|
||
op_value = instr.operands[1].value.imm | ||
|
||
if instr.imm_size == 1: | ||
chars = struct.pack("<B", op_value & 0xFF) | ||
elif instr.imm_size == 2: | ||
chars = struct.pack("<H", op_value & 0xFFFF) | ||
elif instr.imm_size == 4: | ||
chars = struct.pack("<I", op_value & 0xFFFFFFFF) | ||
elif instr.imm_size == 8: | ||
chars = struct.pack("<Q", op_value & 0xFFFFFFFFFFFFFFFF) | ||
else: | ||
raise ValueError("Unhandled operand data type 0x%x." % instr.imm_size) | ||
|
||
def is_printable_ascii(chars): | ||
if sys.version_info[0] >= 3: | ||
return all(c < 127 and chr(c) in string.printable for c in chars) | ||
else: | ||
return all(ord(c) < 127 and c in string.printable for c in chars) | ||
|
||
def is_printable_utf16le(chars): | ||
if sys.version_info[0] >= 3: | ||
if all(c == 0x00 for c in chars[1::2]): | ||
return is_printable_ascii(chars[::2]) | ||
else: | ||
if all(c == "\x00" for c in chars[1::2]): | ||
return is_printable_ascii(chars[::2]) | ||
|
||
if is_printable_ascii(chars): | ||
return instr.imm_size | ||
if is_printable_utf16le(chars): | ||
return instr.imm_size / 2 | ||
|
||
return 0 | ||
|
||
|
||
def extract_features(f, bb): | ||
""" | ||
extract features from the given basic block. | ||
args: | ||
f (smda.common.SmdaFunction): the function from which to extract features | ||
bb (smda.common.SmdaBasicBlock): the basic block to process. | ||
yields: | ||
Feature, set[VA]: the features and their location found in this basic block. | ||
""" | ||
yield BasicBlock(), bb.offset | ||
for bb_handler in BASIC_BLOCK_HANDLERS: | ||
for feature, va in bb_handler(f, bb): | ||
yield feature, va | ||
|
||
|
||
BASIC_BLOCK_HANDLERS = ( | ||
extract_bb_tight_loop, | ||
extract_stackstring, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import struct | ||
|
||
# if we have SMDA we definitely have lief | ||
import lief | ||
|
||
import capa.features.extractors.helpers | ||
import capa.features.extractors.strings | ||
from capa.features import String, Characteristic | ||
from capa.features.file import Export, Import, Section | ||
|
||
|
||
def carve(pbytes, offset=0): | ||
""" | ||
Return a list of (offset, size, xor) tuples of embedded PEs | ||
Based on the version from vivisect: | ||
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19 | ||
And its IDA adaptation: | ||
capa/features/extractors/ida/file.py | ||
""" | ||
mz_xor = [ | ||
( | ||
capa.features.extractors.helpers.xor_static(b"MZ", i), | ||
capa.features.extractors.helpers.xor_static(b"PE", i), | ||
i, | ||
) | ||
for i in range(256) | ||
] | ||
|
||
pblen = len(pbytes) | ||
todo = [(pbytes.find(mzx, offset), mzx, pex, i) for mzx, pex, i in mz_xor] | ||
todo = [(off, mzx, pex, i) for (off, mzx, pex, i) in todo if off != -1] | ||
|
||
while len(todo): | ||
|
||
off, mzx, pex, i = todo.pop() | ||
|
||
# The MZ header has one field we will check | ||
# e_lfanew is at 0x3c | ||
e_lfanew = off + 0x3C | ||
if pblen < (e_lfanew + 4): | ||
continue | ||
|
||
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(pbytes[e_lfanew : e_lfanew + 4], i))[0] | ||
|
||
nextres = pbytes.find(mzx, off + 1) | ||
if nextres != -1: | ||
todo.append((nextres, mzx, pex, i)) | ||
|
||
peoff = off + newoff | ||
if pblen < (peoff + 2): | ||
continue | ||
|
||
if pbytes[peoff : peoff + 2] == pex: | ||
yield (off, i) | ||
|
||
|
||
def extract_file_embedded_pe(smda_report, file_path): | ||
with open(file_path, "rb") as f: | ||
fbytes = f.read() | ||
|
||
for offset, i in carve(fbytes, 1): | ||
yield Characteristic("embedded pe"), offset | ||
|
||
|
||
def extract_file_export_names(smda_report, file_path): | ||
lief_binary = lief.parse(file_path) | ||
if lief_binary is not None: | ||
for function in lief_binary.exported_functions: | ||
yield function.name, function.address | ||
|
||
|
||
def extract_file_import_names(smda_report, file_path): | ||
# extract import table info via LIEF | ||
lief_binary = lief.parse(file_path) | ||
if not isinstance(lief_binary, lief.PE.Binary): | ||
return | ||
for imported_library in lief_binary.imports: | ||
for func in imported_library.entries: | ||
if func.name: | ||
va = func.iat_address + smda_report.base_addr | ||
for name in capa.features.extractors.helpers.generate_symbols(imported_library.name, func.name): | ||
yield Import(name), va | ||
elif func.is_ordinal: | ||
for name in capa.features.extractors.helpers.generate_symbols( | ||
imported_library.name, "#%s" % func.ordinal | ||
): | ||
yield Import(name), va | ||
|
||
|
||
def extract_file_section_names(smda_report, file_path): | ||
lief_binary = lief.parse(file_path) | ||
if not isinstance(lief_binary, lief.PE.Binary): | ||
return | ||
if lief_binary and lief_binary.sections: | ||
base_address = lief_binary.optional_header.imagebase | ||
for section in lief_binary.sections: | ||
yield Section(section.name), base_address + section.virtual_address | ||
|
||
|
||
def extract_file_strings(smda_report, file_path): | ||
""" | ||
extract ASCII and UTF-16 LE strings from file | ||
""" | ||
with open(file_path, "rb") as f: | ||
b = f.read() | ||
|
||
for s in capa.features.extractors.strings.extract_ascii_strings(b): | ||
yield String(s.s), s.offset | ||
|
||
for s in capa.features.extractors.strings.extract_unicode_strings(b): | ||
yield String(s.s), s.offset | ||
|
||
|
||
def extract_features(smda_report, file_path): | ||
""" | ||
extract file features from given workspace | ||
args: | ||
smda_report (smda.common.SmdaReport): a SmdaReport | ||
file_path: path to the input file | ||
yields: | ||
Tuple[Feature, VA]: a feature and its location. | ||
""" | ||
|
||
for file_handler in FILE_HANDLERS: | ||
result = file_handler(smda_report, file_path) | ||
for feature, va in file_handler(smda_report, file_path): | ||
yield feature, va | ||
|
||
|
||
FILE_HANDLERS = ( | ||
extract_file_embedded_pe, | ||
extract_file_export_names, | ||
extract_file_import_names, | ||
extract_file_section_names, | ||
extract_file_strings, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from capa.features import Characteristic | ||
from capa.features.extractors import loops | ||
|
||
|
||
def interface_extract_function_XXX(f): | ||
""" | ||
parse features from the given function. | ||
args: | ||
f (viv_utils.Function): the function to process. | ||
yields: | ||
(Feature, int): the feature and the address at which its found. | ||
""" | ||
yield NotImplementedError("feature"), NotImplementedError("virtual address") | ||
|
||
|
||
def extract_function_calls_to(f): | ||
for inref in f.inrefs: | ||
yield Characteristic("calls to"), inref | ||
|
||
|
||
def extract_function_loop(f): | ||
""" | ||
parse if a function has a loop | ||
""" | ||
edges = [] | ||
for bb_from, bb_tos in f.blockrefs.items(): | ||
for bb_to in bb_tos: | ||
edges.append((bb_from, bb_to)) | ||
|
||
if edges and loops.has_loop(edges): | ||
yield Characteristic("loop"), f.offset | ||
|
||
|
||
def extract_features(f): | ||
""" | ||
extract features from the given function. | ||
args: | ||
f (viv_utils.Function): the function from which to extract features | ||
yields: | ||
Feature, set[VA]: the features and their location found in this function. | ||
""" | ||
for func_handler in FUNCTION_HANDLERS: | ||
for feature, va in func_handler(f): | ||
yield feature, va | ||
|
||
|
||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop) |
Oops, something went wrong.