diff --git a/capa/features/extractors/smda/__init__.py b/capa/features/extractors/smda/__init__.py new file mode 100644 index 000000000..43b6b5086 --- /dev/null +++ b/capa/features/extractors/smda/__init__.py @@ -0,0 +1,49 @@ +import sys +import types + +from smda.common.SmdaReport import SmdaReport +from smda.common.SmdaInstruction import SmdaInstruction + +import capa.features.extractors.smda.file +import capa.features.extractors.smda.insn +import capa.features.extractors.smda.function +import capa.features.extractors.smda.basicblock +from capa.features.extractors import FeatureExtractor + + +class SmdaFeatureExtractor(FeatureExtractor): + def __init__(self, smda_report: SmdaReport, path): + super(SmdaFeatureExtractor, self).__init__() + self.smda_report = smda_report + self.path = path + + def get_base_address(self): + return self.smda_report.base_addr + + def extract_file_features(self): + for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path): + yield feature, va + + def get_functions(self): + for function in self.smda_report.getFunctions(): + yield function + + def extract_function_features(self, f): + for feature, va in capa.features.extractors.smda.function.extract_features(f): + yield feature, va + + def get_basic_blocks(self, f): + for bb in f.getBlocks(): + yield bb + + def extract_basic_block_features(self, f, bb): + for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb): + yield feature, va + + def get_instructions(self, f, bb): + for smda_ins in bb.getInstructions(): + yield smda_ins + + def extract_insn_features(self, f, bb, insn): + for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn): + yield feature, va diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py new file mode 100644 index 000000000..432f5dcf6 --- /dev/null +++ b/capa/features/extractors/smda/basicblock.py @@ -0,0 +1,136 @@ +import sys +import string +import struct + +from capa.features import Characteristic +from capa.features.basicblock import BasicBlock +from capa.features.extractors.helpers import MIN_STACKSTRING_LEN + + +def _bb_has_tight_loop(f, bb): + """ + parse tight loops, true if last instruction in basic block branches to bb start + """ + return bb.offset in f.blockrefs[bb.offset] if bb.offset in f.blockrefs else False + + +def extract_bb_tight_loop(f, bb): + """ check basic block for tight loop indicators """ + if _bb_has_tight_loop(f, bb): + yield Characteristic("tight loop"), bb.offset + + +def _bb_has_stackstring(f, bb): + """ + extract potential stackstring creation, using the following heuristics: + - basic block contains enough moves of constant bytes to the stack + """ + count = 0 + for instr in bb.getInstructions(): + if is_mov_imm_to_stack(instr): + count += get_printable_len(instr.getDetailed()) + if count > MIN_STACKSTRING_LEN: + return True + return False + + +def get_operands(smda_ins): + return [o.strip() for o in smda_ins.operands.split(",")] + + +def extract_stackstring(f, bb): + """ check basic block for stackstring indicators """ + if _bb_has_stackstring(f, bb): + yield Characteristic("stack string"), bb.offset + + +def is_mov_imm_to_stack(smda_ins): + """ + Return if instruction moves immediate onto stack + """ + if not smda_ins.mnemonic.startswith("mov"): + return False + + try: + dst, src = get_operands(smda_ins) + except ValueError: + # not two operands + return False + + try: + int(src, 16) + except ValueError: + return False + + if not any(regname in dst for regname in ["ebp", "rbp", "esp", "rsp"]): + return False + + return True + + +def get_printable_len(instr): + """ + Return string length if all operand bytes are ascii or utf16-le printable + + Works on a capstone instruction + """ + # should have exactly two operands for mov immediate + if len(instr.operands) != 2: + return 0 + + op_value = instr.operands[1].value.imm + + if instr.imm_size == 1: + chars = struct.pack("= 3: + return all(c < 127 and chr(c) in string.printable for c in chars) + else: + return all(ord(c) < 127 and c in string.printable for c in chars) + + def is_printable_utf16le(chars): + if sys.version_info[0] >= 3: + if all(c == 0x00 for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + else: + if all(c == "\x00" for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + + if is_printable_ascii(chars): + return instr.imm_size + if is_printable_utf16le(chars): + return instr.imm_size / 2 + + return 0 + + +def extract_features(f, bb): + """ + extract features from the given basic block. + + args: + f (smda.common.SmdaFunction): the function from which to extract features + bb (smda.common.SmdaBasicBlock): the basic block to process. + + yields: + Feature, set[VA]: the features and their location found in this basic block. + """ + yield BasicBlock(), bb.offset + for bb_handler in BASIC_BLOCK_HANDLERS: + for feature, va in bb_handler(f, bb): + yield feature, va + + +BASIC_BLOCK_HANDLERS = ( + extract_bb_tight_loop, + extract_stackstring, +) diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py new file mode 100644 index 000000000..ba643ad42 --- /dev/null +++ b/capa/features/extractors/smda/file.py @@ -0,0 +1,139 @@ +import struct + +# if we have SMDA we definitely have lief +import lief + +import capa.features.extractors.helpers +import capa.features.extractors.strings +from capa.features import String, Characteristic +from capa.features.file import Export, Import, Section + + +def carve(pbytes, offset=0): + """ + Return a list of (offset, size, xor) tuples of embedded PEs + + Based on the version from vivisect: + https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19 + And its IDA adaptation: + capa/features/extractors/ida/file.py + """ + mz_xor = [ + ( + capa.features.extractors.helpers.xor_static(b"MZ", i), + capa.features.extractors.helpers.xor_static(b"PE", i), + i, + ) + for i in range(256) + ] + + pblen = len(pbytes) + todo = [(pbytes.find(mzx, offset), mzx, pex, i) for mzx, pex, i in mz_xor] + todo = [(off, mzx, pex, i) for (off, mzx, pex, i) in todo if off != -1] + + while len(todo): + + off, mzx, pex, i = todo.pop() + + # The MZ header has one field we will check + # e_lfanew is at 0x3c + e_lfanew = off + 0x3C + if pblen < (e_lfanew + 4): + continue + + newoff = struct.unpack(" buffer_end: + return smda_report.buffer[rva:] + else: + return smda_report.buffer[rva : rva + max_bytes] + + +def extract_insn_bytes_features(f, bb, insn): + """ + parse byte sequence features from the given instruction. + example: + # push offset iid_004118d4_IShellLinkA ; riid + """ + for data_ref in insn.getDataRefs(): + bytes_read = read_bytes(f.smda_report, data_ref) + if bytes_read is None: + continue + if capa.features.extractors.helpers.all_zeros(bytes_read): + continue + yield Bytes(bytes_read), insn.offset + + +def detectAsciiLen(smda_report, offset): + if smda_report.buffer is None: + return 0 + ascii_len = 0 + rva = offset - smda_report.base_addr + char = smda_report.buffer[rva] + while char < 127 and chr(char) in string.printable: + ascii_len += 1 + rva += 1 + char = smda_report.buffer[rva] + if char == 0: + return ascii_len + return 0 + + +def detectUnicodeLen(smda_report, offset): + if smda_report.buffer is None: + return 0 + unicode_len = 0 + rva = offset - smda_report.base_addr + char = smda_report.buffer[rva] + second_char = smda_report.buffer[rva + 1] + while char < 127 and chr(char) in string.printable and second_char == 0: + unicode_len += 2 + rva += 2 + char = smda_report.buffer[rva] + second_char = smda_report.buffer[rva + 1] + if char == 0 and second_char == 0: + return unicode_len + return 0 + + +def read_string(smda_report, offset): + alen = detectAsciiLen(smda_report, offset) + if alen > 1: + return read_bytes(smda_report, offset, alen).decode("utf-8") + ulen = detectUnicodeLen(smda_report, offset) + if ulen > 2: + return read_bytes(smda_report, offset, ulen).decode("utf-16") + + +def extract_insn_string_features(f, bb, insn): + """parse string features from the given instruction.""" + # example: + # + # push offset aAcr ; "ACR > " + for data_ref in insn.getDataRefs(): + string_read = read_string(f.smda_report, data_ref) + if string_read: + yield String(string_read.rstrip("\x00")), insn.offset + + +def extract_insn_offset_features(f, bb, insn): + """parse structure offset features from the given instruction.""" + # examples: + # + # mov eax, [esi + 4] + # mov eax, [esi + ecx + 16384] + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + number = None + number_hex = re.search(r"[+\-] (?P0x[a-fA-F0-9]+)", operand) + number_int = re.search(r"[+\-] (?P[0-9])", operand) + if number_hex: + number = int(number_hex.group("num"), 16) + number = -1 * number if number_hex.group().startswith("-") else number + elif number_int: + number = int(number_int.group("num")) + number = -1 * number if number_int.group().startswith("-") else number + if not operand.startswith("0") and number is not None: + yield Offset(number), insn.offset + + +def is_security_cookie(f, bb, insn): + """ + check if an instruction is related to security cookie checks + """ + # security cookie check should use SP or BP + operands = [o.strip() for o in insn.operands.split(",")] + if operands[0] not in ["esp", "ebp", "rsp", "rbp"]: + return False + for index, block in enumerate(f.getBlocks()): + # expect security cookie init in first basic block within first bytes (instructions) + if index == 0 and insn.offset < (block[0].offset + SECURITY_COOKIE_BYTES_DELTA): + return True + # ... or within last bytes (instructions) before a return + if block[-1].mnemonic.startswith("ret") and insn.offset > (block[-1].offset - SECURITY_COOKIE_BYTES_DELTA): + return True + return False + + +def extract_insn_nzxor_characteristic_features(f, bb, insn): + """ + parse non-zeroing XOR instruction from the given instruction. + ignore expected non-zeroing XORs, e.g. security cookies. + """ + + if insn.mnemonic != "xor": + return + + operands = [o.strip() for o in insn.operands.split(",")] + if operands[0] == operands[1]: + return + + if is_security_cookie(f, bb, insn): + return + + yield Characteristic("nzxor"), insn.offset + + +def extract_insn_mnemonic_features(f, bb, insn): + """parse mnemonic features from the given instruction.""" + yield Mnemonic(insn.mnemonic), insn.offset + + +def extract_insn_peb_access_characteristic_features(f, bb, insn): + """ + parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64 + """ + + if insn.mnemonic not in ["push", "mov"]: + return + + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + if "fs:" in operand and "0x30" in operand: + yield Characteristic("peb access"), insn.offset + elif "gs:" in operand and "0x60" in operand: + yield Characteristic("peb access"), insn.offset + + +def extract_insn_segment_access_features(f, bb, insn): + """ parse the instruction for access to fs or gs """ + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + if "fs:" in operand and "0x30" in operand: + yield Characteristic("fs access"), insn.offset + elif "gs:" in operand and "0x60" in operand: + yield Characteristic("gs access"), insn.offset + + +def get_section(vw, va): + for start, length, _, __ in vw.getMemoryMaps(): + if start <= va < start + length: + return start + + raise KeyError(va) + + +def extract_insn_cross_section_cflow(f, bb, insn): + """ + inspect the instruction for a CALL or JMP that crosses section boundaries. + """ + if insn.mnemonic in ["call", "jmp"]: + if insn.offset in f.apirefs: + return + + if insn.offset in f.outrefs: + for target in f.outrefs[insn.offset]: + if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + yield Characteristic("cross section flow"), insn.offset + elif insn.operands.startswith("0x"): + target = int(insn.operands, 16) + if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + yield Characteristic("cross section flow"), insn.offset + + +# this is a feature that's most relevant at the function scope, +# however, its most efficient to extract at the instruction scope. +def extract_function_calls_from(f, bb, insn): + if insn.mnemonic != "call": + return + + if insn.offset in f.outrefs: + for outref in f.outrefs[insn.offset]: + yield Characteristic("calls from"), outref + + if outref == f.offset: + # if we found a jump target and it's the function address + # mark as recursive + yield Characteristic("recursive call"), outref + + +# this is a feature that's most relevant at the function or basic block scope, +# however, its most efficient to extract at the instruction scope. +def extract_function_indirect_call_characteristic_features(f, bb, insn): + """ + extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4]) + does not include calls like => call ds:dword_ABD4974 + """ + if insn.mnemonic != "call": + return + if insn.operands.startswith("0x"): + return False + if "qword ptr" in insn.operands and "rip" in insn.operands: + return False + if insn.operands.startswith("dword ptr [0x"): + return False + # call edx + # call dword ptr [eax+50h] + # call qword ptr [rsp+78h] + yield Characteristic("indirect call"), insn.offset + + +def extract_features(f, bb, insn): + """ + extract features from the given insn. + + args: + f (smda.common.SmdaFunction): the function to process. + bb (smda.common.SmdaBasicBlock): the basic block to process. + insn (smda.common.SmdaInstruction): the instruction to process. + + yields: + Feature, set[VA]: the features and their location found in this insn. + """ + for insn_handler in INSTRUCTION_HANDLERS: + for feature, va in insn_handler(f, bb, insn): + yield feature, va + + +INSTRUCTION_HANDLERS = ( + extract_insn_api_features, + extract_insn_number_features, + extract_insn_string_features, + extract_insn_bytes_features, + extract_insn_offset_features, + extract_insn_nzxor_characteristic_features, + extract_insn_mnemonic_features, + extract_insn_peb_access_characteristic_features, + extract_insn_cross_section_cflow, + extract_insn_segment_access_features, + extract_function_calls_from, + extract_function_indirect_call_characteristic_features, +) diff --git a/capa/main.py b/capa/main.py index f1ca77609..2be2802d4 100644 --- a/capa/main.py +++ b/capa/main.py @@ -295,7 +295,19 @@ class UnsupportedRuntimeError(RuntimeError): def get_extractor_py3(path, format, disable_progress=False): - raise UnsupportedRuntimeError() + from smda.SmdaConfig import SmdaConfig + from smda.Disassembler import Disassembler + + import capa.features.extractors.smda + + smda_report = None + with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): + config = SmdaConfig() + config.STORE_BUFFER = True + smda_disasm = Disassembler(config) + smda_report = smda_disasm.disassembleFile(path) + + return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path) def get_extractor(path, format, disable_progress=False): @@ -446,14 +458,25 @@ def main(argv=None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument( - # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters - # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works - # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) - "sample", - type=lambda s: s.decode(sys.getfilesystemencoding()), - help="path to sample to analyze", - ) + # TODO: decode won't work for python3 + if sys.version_info >= (3, 0): + parser.add_argument( + # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters + # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works + # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) + "sample", + type=str, + help="path to sample to analyze", + ) + else: + parser.add_argument( + # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters + # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works + # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) + "sample", + type=lambda s: s.decode(sys.getfilesystemencoding()), + help="path to sample to analyze", + ) parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__)) parser.add_argument( "-r", @@ -550,7 +573,7 @@ def main(argv=None): # during the load of the RuleSet, we extract subscope statements into their own rules # that are subsequently `match`ed upon. this inflates the total rule count. # so, filter out the subscope rules when reporting total number of loaded rules. - len(filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())), + len([i for i in filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())]), ) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/setup.py b/setup.py index 4c09dd04a..4cf4aa8b5 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ if sys.version_info >= (3, 0): # py3 requirements.append("networkx") + requirements.append("smda") else: # py2 requirements.append("enum34==1.1.6") # v1.1.6 is needed by halo 0.0.30 / spinners 0.0.24 diff --git a/tests/fixtures.py b/tests/fixtures.py index 3ff40f6c9..4e6a907fc 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -81,6 +81,21 @@ def get_viv_extractor(path): return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) +@lru_cache() +def get_smda_extractor(path): + from smda.SmdaConfig import SmdaConfig + from smda.Disassembler import Disassembler + + import capa.features.extractors.smda + + config = SmdaConfig() + config.STORE_BUFFER = True + disasm = Disassembler(config) + report = disasm.disassembleFile(path) + + return capa.features.extractors.smda.SmdaFeatureExtractor(report, path) + + @lru_cache() def extract_file_features(extractor): features = collections.defaultdict(set) @@ -473,7 +488,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected): def get_extractor(path): if sys.version_info >= (3, 0): - raise RuntimeError("no supported py3 backends yet") + extractor = get_smda_extractor(path) else: extractor = get_viv_extractor(path) diff --git a/tests/test_main.py b/tests/test_main.py index 6ceae34a7..783fc95c8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -19,7 +19,6 @@ from capa.engine import * -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main(z9324d_extractor): # tests rules can be loaded successfully and all output modes path = z9324d_extractor.path @@ -29,7 +28,6 @@ def test_main(z9324d_extractor): assert capa.main.main([path]) == 0 -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_single_rule(z9324d_extractor, tmpdir): # tests a single rule can be loaded successfully RULE_CONTENT = textwrap.dedent( @@ -58,7 +56,6 @@ def test_main_single_rule(z9324d_extractor, tmpdir): ) -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys): # on py2.7, need to be careful about str (which can hold bytes) # vs unicode (which is only unicode characters). @@ -71,18 +68,22 @@ def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys): std = capsys.readouterr() # but here, we have to use a unicode instance, # because capsys has decoded the output for us. - assert pingtaest_extractor.path.decode("utf-8") in std.out + if sys.version_info >= (3, 0): + assert pingtaest_extractor.path in std.out + else: + assert pingtaest_extractor.path.decode("utf-8") in std.out -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_non_ascii_filename_nonexistent(tmpdir, caplog): NON_ASCII_FILENAME = "täst_not_there.exe" assert capa.main.main(["-q", NON_ASCII_FILENAME]) == -1 - assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text + if sys.version_info >= (3, 0): + assert NON_ASCII_FILENAME in caplog.text + else: + assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_shellcode(z499c2_extractor): path = z499c2_extractor.path assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0 @@ -137,7 +138,6 @@ def test_ruleset(): assert len(rules.basic_block_rules) == 1 -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_match_across_scopes_file_function(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -201,7 +201,6 @@ def test_match_across_scopes_file_function(z9324d_extractor): assert ".text section and install service" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_match_across_scopes(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -264,7 +263,6 @@ def test_match_across_scopes(z9324d_extractor): assert "kill thread program" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_subscope_bb_rules(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -289,7 +287,6 @@ def test_subscope_bb_rules(z9324d_extractor): assert "test rule" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_byte_matching(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -312,7 +309,6 @@ def test_byte_matching(z9324d_extractor): assert "byte match test" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_count_bb(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -336,7 +332,6 @@ def test_count_bb(z9324d_extractor): assert "count bb" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_fix262(pma16_01_extractor, capsys): # tests rules can be loaded successfully and all output modes path = pma16_01_extractor.path @@ -347,7 +342,6 @@ def test_fix262(pma16_01_extractor, capsys): assert "www.practicalmalwareanalysis.com" not in std.out -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_not_render_rules_also_matched(z9324d_extractor, capsys): # rules that are also matched by other rules should not get rendered by default. # this cuts down on the amount of output while giving approx the same detail.