diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index a73f40afe..dc1b6ea5c 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -6,15 +6,26 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import re import logging import itertools import collections -from typing import Any, Tuple +from typing import Any, List, Tuple, Iterator, Optional +import capa.features.extractors.cape.file as cape_file from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor +from capa.features.extractors.cape.models import Call, CapeReport +from capa.features.extractors.base_extractor import ( + CallHandle, + ThreadHandle, + ProcessHandle, + FeatureExtractor, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) + logger = logging.getLogger(__name__) @@ -77,3 +88,85 @@ def find_capabilities( return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") + + +def extract_ip_addresses(strings: List[str]) -> Iterator[str]: + """ yield (IPv4 and IPv6) IP address regex matches from list of strings """ + # Both the IPv4 and IPv6 regex patterns are discussed here: + # (https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses) + ipv4_pattern = r""" + ((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]| + (2[0-4]|1{0,1}[0-9]){0,1}[0-9]) + """ + + ipv6_pattern = r""" + ( + ([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}| + ([0-9a-fA-F]{1,4}:){1,7}:| + ([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}| + ([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}| + ([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}| + ([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}| + ([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}| + [0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})| + :((:[0-9a-fA-F]{1,4}){1,7}|:)| + fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}| + ::(ffff(:0{1,4}){0,1}:){0,1} + ((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3} + (25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])| + ([0-9a-fA-F]{1,4}:){1,4}: + ((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3} + (25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]) + ) + """ + + for string in strings: + if re.search(ipv4_pattern, string): + yield string + + if re.search(ipv6_pattern, string): + yield string + + +def extract_domain_names(strings: List[str]) -> Iterator[str]: + """ yield web domain regex matches from list of strings """ + # See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790) + domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$" + for string in strings: + if re.search(domain_pattern, string): + yield string + + +def extract_file_names( + process_handles: Iterator[ProcessHandle], + thread_handles: Iterator[ThreadHandle], + call_handles: Iterator[CallHandle], + report: Optional[CapeReport], +): + """ + extracts Windows API file maniuplation functions that processes import + yields: 1) API name, and 2) file that it iteracts with + + 'default.render_file_names' checks whether 'report' is None before calling 'extract_file_name' + + yield: + ch.api (str): the API that interacts with the filename + call.arguments[0].name (str): a filename, which is a parameter of some WinAPI file interaction functions + """ + # Extract many Windows API functions that take a filename as an argument + winapi_file_functions = [] + for feature, _ in cape_file.extract_import_names(report): + assert type(feature.value) == "str" # feature.value type annotation is: 'value: Union[str, int, float, bytes]' + if feature.value.str.contains("File"): # a lot of Windows API file interaction function names contain "File" + winapi_file_functions.append(feature[0]) + + for ph in process_handles: + for th in thread_handles: + for ch in call_handles: + call: Call = ch.inner + if call.api in winapi_file_functions: + # winapi_file_functions functions take file name as their first variable + # since calling conventions commonly store function parameters on the stack in reverse order, + # we yield the file name with call.arguments[-1].name + # although should we use call.arguments[0].name to get file names for different calling conventions? + yield call.api, call.arguments[-1].name diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 70b98df56..2175ef2bf 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -74,6 +74,10 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() + strings = None + sandbox_data = None + report = None + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) meta.analysis.feature_counts = counts["feature_counts"] @@ -84,13 +88,13 @@ def run_headless(): logger.info("capa encountered warnings during analysis") if args.json: - print(capa.render.json.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.json.render(meta, rules, capabilities, strings, sandbox_data)) # noqa: T201 elif args.vverbose: - print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 elif args.verbose: - print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 else: - print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 return 0 @@ -124,6 +128,10 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() + strings = None + sandbox_data = None + report = None + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) meta.analysis.feature_counts = counts["feature_counts"] @@ -134,11 +142,11 @@ def run_ui(): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": - print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 elif verbose == "verbose": - print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 else: - print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201 return 0 diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 4e1bd572a..6facfe096 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -823,9 +823,16 @@ def slot_progress_feature_extraction(text): update_wait_box("collecting results") + strings = None + sandbox_data = None + try: self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa( - meta, ruleset, capabilities + meta, + ruleset, + capabilities, + strings, + sandbox_data, ) except Exception as e: logger.exception("Failed to collect results (error: %s)", e) diff --git a/capa/main.py b/capa/main.py index e5ee92a2a..499a83712 100644 --- a/capa/main.py +++ b/capa/main.py @@ -19,7 +19,7 @@ import textwrap import contextlib from types import TracebackType -from typing import Any, Set, Dict, List, Callable, Optional +from typing import Any, Set, Dict, List, Tuple, Callable, Iterator, Optional from pathlib import Path import halo @@ -84,7 +84,11 @@ ) from capa.features.address import Address from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities +from capa.features.extractors.cape.models import CapeReport from capa.features.extractors.base_extractor import ( + CallHandle, + ThreadHandle, + ProcessHandle, SampleHashes, FeatureExtractor, StaticFeatureExtractor, @@ -1226,14 +1230,22 @@ def main(argv: Optional[List[str]] = None): # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): return E_FILE_LIMITATION + + sandbox_data = Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]] + report = json.load(Path(args.sample).open(encoding="utf-8")) + + try: + strings = report.static.pe.imports + except AttributeError: + strings = None if args.json: - print(capa.render.json.render(meta, rules, capabilities)) + print(capa.render.json.render(meta, rules, capabilities, strings, sandbox_data)) elif args.vverbose: - print(capa.render.vverbose.render(meta, rules, capabilities)) + print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report)) elif args.verbose: - print(capa.render.verbose.render(meta, rules, capabilities)) + print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report)) else: - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) colorama.deinit() logger.debug("done.") @@ -1271,6 +1283,10 @@ def ida_main(): capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) + report = None + strings = None + sandbox_data = None + meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] @@ -1278,7 +1294,7 @@ def ida_main(): capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") colorama.init(strip=True) - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) def ghidra_main(): @@ -1309,13 +1325,17 @@ def ghidra_main(): not capa.ghidra.helpers.is_running_headless(), ) + report = None + strings = None + sandbox_data = None + meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] if has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) if __name__ == "__main__": diff --git a/capa/render/default.py b/capa/render/default.py index 1af0d27ca..2f3e2a014 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -7,15 +7,19 @@ # See the License for the specific language governing permissions and limitations under the License. import collections +from typing import List, Tuple, Iterator, Optional import tabulate import capa.render.utils as rutils +import capa.capabilities.common as common import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.rules import RuleSet from capa.engine import MatchResults from capa.render.utils import StringIO +from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle tabulate.PRESERVE_WHITESPACE = True @@ -197,7 +201,58 @@ def render_mbc(doc: rd.ResultDocument, ostream: StringIO): ostream.write("\n") -def render_default(doc: rd.ResultDocument): +def render_ip_addresses(doc: rd.ResultDocument, ostream: StringIO): + if doc.strings is not None: + rows = [] + for ip_addr in common.extract_ip_addresses(doc.strings): + rows.append(rutils.bold(ip_addr.lower())) # lowercase IPv6 letters + + if rows: + ostream.write( + tabulate.tabulate( + rows, + headers=[width("Possible IP Addresses", max(len(ip_addr) for ip_addr in rows) + 1)], + tablefmt="mixed_grid", + ) + ) + ostream.write("\n") + + +def render_domains(doc: rd.ResultDocument, ostream: StringIO): + if doc.strings is not None: + rows = [] + for domain in common.extract_domain_names(doc.strings): + rows.append(rutils.bold(domain)) + + if rows: + ostream.write( + tabulate.tabulate( + rows, + headers=[width("Web Domains", max(len(domain) for domain in rows) + 1)], + tablefmt="mixed_grid", + ) + ) + ostream.write("\n") + + +def render_file_names(doc: rd.ResultDocument, report: Optional[CapeReport], ostream: StringIO): + if doc.sandbox_data is not None and report is not None: + rows: List = [] + for api, file_name in common.extract_file_names(*doc.sandbox_data, report): + rows.append([rutils.bold(api), rutils.bold(file_name)]) + + if rows: + ostream.write( + tabulate.tabulate( + rows, + headers=[width("APIs", 25), width("File names", 75)], + tablefmt="mixed_grid", + ) + ) + ostream.write("\n") + + +def render_default(doc: rd.ResultDocument, report: Optional[CapeReport]): ostream = rutils.StringIO() render_meta(doc, ostream) @@ -207,10 +262,24 @@ def render_default(doc: rd.ResultDocument): render_mbc(doc, ostream) ostream.write("\n") render_capabilities(doc, ostream) + ostream.write("\n") + # the following functions perform ostream.write("\n") conditionally under the hood + # doc.strings functions under the hood + render_ip_addresses(doc, ostream) + render_domains(doc, ostream) + # *doc.sandbox_data under the hood + render_file_names(doc, report, ostream) return ostream.getvalue() -def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) - return render_default(doc) +def render( + meta, + rules: RuleSet, + capabilities: MatchResults, + strings: Optional[list[str]], + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]], + report: Optional[CapeReport], +) -> str: + doc = rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data) + return render_default(doc, report) diff --git a/capa/render/json.py b/capa/render/json.py index dcd535fe9..a92577e51 100644 --- a/capa/render/json.py +++ b/capa/render/json.py @@ -5,10 +5,21 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +from typing import List, Tuple, Iterator, Optional + import capa.render.result_document as rd from capa.rules import RuleSet from capa.engine import MatchResults +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle -def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - return rd.ResultDocument.from_capa(meta, rules, capabilities).model_dump_json(exclude_none=True) +def render( + meta, + rules: RuleSet, + capabilities: MatchResults, + strings: Optional[List[str]], + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]], +) -> str: + return rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data).model_dump_json( + exclude_none=True + ) diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 2ef85185e..d6ced33f0 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -8,7 +8,7 @@ import datetime import collections from enum import Enum -from typing import Dict, List, Tuple, Union, Literal, Optional +from typing import Dict, List, Tuple, Union, Literal, Iterator, Optional from pathlib import Path from pydantic import Field, BaseModel, ConfigDict @@ -23,6 +23,7 @@ from capa.rules import RuleSet from capa.engine import MatchResults from capa.helpers import assert_never +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle class FrozenModel(BaseModel): @@ -628,9 +629,18 @@ class RuleMatches(FrozenModel): class ResultDocument(FrozenModel): meta: Metadata rules: Dict[str, RuleMatches] + strings: Optional[List[str]] + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]] @classmethod - def from_capa(cls, meta: Metadata, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument": + def from_capa( + cls, + meta: Metadata, + rules: RuleSet, + capabilities: MatchResults, + strings: Optional[List[str]], + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]], + ) -> "ResultDocument": rule_matches: Dict[str, RuleMatches] = {} for rule_name, matches in capabilities.items(): rule = rules[rule_name] @@ -647,7 +657,12 @@ def from_capa(cls, meta: Metadata, rules: RuleSet, capabilities: MatchResults) - ), ) - return ResultDocument(meta=meta, rules=rule_matches) + return ResultDocument( + meta=meta, + rules=rule_matches, + strings=strings, + sandbox_data=sandbox_data, + ) def to_capa(self) -> Tuple[Metadata, Dict]: capabilities: Dict[ @@ -665,7 +680,7 @@ def to_capa(self) -> Tuple[Metadata, Dict]: capabilities[rule_name].append((addr.to_capa(), result)) - return self.meta, capabilities + return self.meta, capabilities # TODO: implement strings and sandbox_data for 'to_capa' too @classmethod def from_file(cls, path: Path) -> "ResultDocument": diff --git a/capa/render/verbose.py b/capa/render/verbose.py index f6f566dec..109c6605f 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -22,17 +22,20 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from typing import cast +from typing import List, Tuple, Iterator, Optional, cast import tabulate import capa.rules import capa.helpers import capa.render.utils as rutils +import capa.render.default as default import capa.features.freeze as frz import capa.render.result_document as rd from capa.rules import RuleSet from capa.engine import MatchResults +from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle def format_address(address: frz.Address) -> str: @@ -316,7 +319,7 @@ def render_rules(ostream, doc: rd.ResultDocument): ostream.writeln(rutils.bold("no capabilities found")) -def render_verbose(doc: rd.ResultDocument): +def render_verbose(doc: rd.ResultDocument, report: Optional[CapeReport]): ostream = rutils.StringIO() render_meta(ostream, doc) @@ -325,8 +328,32 @@ def render_verbose(doc: rd.ResultDocument): render_rules(ostream, doc) ostream.write("\n") + # the next three functions perform ostream.write("\n") conditionally under the hood + # doc.strings functions under the hood + default.render_ip_addresses(doc, ostream) + default.render_domains(doc, ostream) + + # *doc.sandbox_data under the hood + default.render_file_names(doc, report, ostream) + return ostream.getvalue() -def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - return render_verbose(rd.ResultDocument.from_capa(meta, rules, capabilities)) +def render( + meta, + rules: RuleSet, + capabilities: MatchResults, + strings: Optional[List[str]], + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]], + report: Optional[CapeReport], +) -> str: + return render_verbose( + rd.ResultDocument.from_capa( + meta, + rules, + capabilities, + strings, + sandbox_data, + ), + report, + ) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 3498d24b8..85bd8bd62 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -7,13 +7,14 @@ # See the License for the specific language governing permissions and limitations under the License. import logging import textwrap -from typing import Dict, Iterable, Optional +from typing import Dict, List, Tuple, Iterable, Iterator, Optional import tabulate import capa.rules import capa.helpers import capa.render.utils as rutils +import capa.render.default as default import capa.render.verbose import capa.features.common import capa.features.freeze as frz @@ -22,6 +23,8 @@ import capa.features.freeze.features as frzf from capa.rules import RuleSet from capa.engine import MatchResults +from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -458,7 +461,7 @@ def render_rules(ostream, doc: rd.ResultDocument): ostream.writeln(rutils.bold("no capabilities found")) -def render_vverbose(doc: rd.ResultDocument): +def render_vverbose(doc: rd.ResultDocument, report: Optional[CapeReport]): ostream = rutils.StringIO() capa.render.verbose.render_meta(ostream, doc) @@ -467,8 +470,32 @@ def render_vverbose(doc: rd.ResultDocument): render_rules(ostream, doc) ostream.write("\n") + # the following three functions perform ostream.write("\n") conditionally under the hood + # doc.strings functions under the hood + default.render_ip_addresses(doc, ostream) + default.render_domains(doc, ostream) + + # *doc.sandbox_data under the hood + default.render_file_names(doc, report, ostream) + return ostream.getvalue() -def render(meta, rules: RuleSet, capabilities: MatchResults) -> str: - return render_vverbose(rd.ResultDocument.from_capa(meta, rules, capabilities)) +def render( + meta, + rules: RuleSet, + capabilities: MatchResults, + strings: Optional[List[str]], + sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]], + report: Optional[CapeReport], +) -> str: + return render_vverbose( + rd.ResultDocument.from_capa( + meta, + rules, + capabilities, + strings, + sandbox_data, + ), + report, + ) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 8950b8936..7a5068eb6 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -69,6 +69,7 @@ import argparse import multiprocessing import multiprocessing.pool +from typing import Tuple, Iterator, Optional from pathlib import Path import capa @@ -78,6 +79,8 @@ import capa.capabilities.common import capa.render.result_document as rd from capa.features.common import OS_AUTO +from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle logger = logging.getLogger("capa") @@ -142,7 +145,17 @@ def get_capa_results(args): meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + sandbox_data = Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]] + # report = Optional[CapeReport] + Path(__file__).resolve().parent.parent + report = json.load(Path(args.sample).open(encoding="utf-8")) + + try: + strings = report.static.pe.imports + except AttributeError: + strings = None + + doc = rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data) return {"path": path, "status": "ok", "ok": doc.model_dump()} diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 611576908..ff666d51f 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -183,18 +183,22 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) capa_output: Any = False + + strings = None + sandbox_data = None + report = None if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data) capa_output = render_dictionary(doc) elif output_format == "json": # render results # ...as json - capa_output = json.loads(capa.render.json.render(meta, rules, capabilities)) + capa_output = json.loads(capa.render.json.render(meta, rules, capabilities, strings, sandbox_data)) elif output_format == "texttable": # ...as human readable text table - capa_output = capa.render.default.render(meta, rules, capabilities) + capa_output = capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report) return capa_output diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 421c6c7e1..3966c405a 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -60,7 +60,7 @@ import logging import argparse import collections -from typing import Dict +from typing import Dict, Tuple, Iterator from pathlib import Path import colorama @@ -77,9 +77,15 @@ import capa.capabilities.common import capa.render.result_document as rd from capa.helpers import get_file_taste -from capa.features.common import FORMAT_AUTO +from capa.features.common import FORMAT_AUTO, FORMAT_CAPE from capa.features.freeze import Address -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor +from capa.features.extractors.base_extractor import ( + CallHandle, + ThreadHandle, + ProcessHandle, + FeatureExtractor, + StaticFeatureExtractor, +) logger = logging.getLogger("capa.show-capabilities-by-function") @@ -187,6 +193,16 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 + try: + strings = extractor.strings + except AttributeError: + strings = None + + if format_ == FORMAT_CAPE: + sandbox_data = Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]] + else: + sandbox_data = None + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) @@ -203,7 +219,7 @@ def main(argv=None): # - when not an interactive session, and disable coloring # renderers should use coloring and assume it will be stripped out if necessary. colorama.init() - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data) print(render_matches_by_function(doc)) colorama.deinit() diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index ddc7f6c3f..39285e9df 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -7,8 +7,12 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import textwrap +from typing import List + +import fixtures import capa.capabilities.common +from capa.capabilities.common import extract_domain_names, extract_ip_addresses def test_match_across_scopes_file_function(z9324d_extractor): @@ -307,3 +311,83 @@ def test_instruction_subscope(z9324d_extractor): capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} + + +@fixtures.parameterize( + "strings", + [ + ("8.8.8.8"), + ("128.0.0.1"), + ("123.4.56.78"), + ("0.0.0.0"), + ("255.255.255.255"), + ("255.255.255.256"), + ("255.255.255.-1"), + ("2555.255.255.255"), + ], +) +def test_extract_ipv4_addresses(strings: List[str]): + assert extract_ip_addresses(strings) == "8.8.8.8" + assert extract_ip_addresses(strings) == "128.0.0.1" + assert extract_ip_addresses(strings) == "123.4.56.78" + assert extract_ip_addresses(strings) == "0.0.0.0" + assert extract_ip_addresses(strings) == "255.255.255.255" + assert not extract_ip_addresses(strings) # '255.255.255.256' + assert not extract_ip_addresses(strings) # '255.255.255.-1' + assert not extract_ip_addresses(strings) # '2555.255.255.255' + + +@fixtures.parameterize( + "strings", + [ + ("2001:0db8:85a3:0000:0000:8a2e:0370:7334"), + ("fe80:0000:0000:0000:0202:b3ff:fe1e:8329"), + ("2002::1234:5678:9abc:def0"), + ("::1"), + ("0:0:0:0:0:0:0:0"), + ("fc00::8::9"), + ("2a02:c7ff:16ce:0000:0000:0000:0000:1*"), + ("3ffe:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + ("1234:5678:9abc:defg:1234:5678:9abc:def0"), + ("2001:0db8:85a3:0000:0000:8a2e:0370:"), + ("2001:0000:0000:0000:0000:0000:8a2e::7334"), + ("0:0:0:0:0:0:0:0:0"), + ("2001:0db8:85a3:0000:0000:8a2e:0370:G334"), + ], +) +def test_extract_ipv6_addresses(strings: List[str]): + assert extract_ip_addresses(strings) == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" + assert extract_ip_addresses(strings) == "fe80:0000:0000:0000:0202:b3ff:fe1e:8329" + assert extract_ip_addresses(strings) == "2002::1234:5678:9abc:def0" + assert extract_ip_addresses(strings) == "::1" + assert extract_ip_addresses(strings) == "0:0:0:0:0:0:0:0" + assert extract_ip_addresses(strings) == "fc00::8::9" + assert extract_ip_addresses(strings) == "2a02:c7ff:16ce:0000:0000:0000:0000:1*" + assert extract_ip_addresses(strings) == "3ffe:ffff:ffff:ffff:ffff:ffff:ffff:ffff" + assert not extract_ip_addresses(strings) # '1234:5678:9abc:defg:1234:5678:9abc:def0' + assert not extract_ip_addresses(strings) # '2001:0db8:85a3:0000:0000:8a2e:0370:' + assert not extract_ip_addresses(strings) # '2001:0000:0000:0000:0000:0000:8a2e::7334' + assert not extract_ip_addresses(strings) # '0:0:0:0:0:0:0:0:0' + assert not extract_ip_addresses(strings) # '2001:0db8:85a3:0000:0000:8a2e:0370:G334' + + +@fixtures.parameterize( + "strings", + [ + ("website.com"), + ("website.comcomcomcomcomcomcomcomcomcom"), + ("2345kd-fkdgjfd.dsjfkj.web-site.gfdsa"), + ("g.o.o.g.l.e.com"), + ("foobar.co"), + ("foobar.c"), + ("g.o.o.g.l.3.com"), + ], +) +def test_extract_domain_names(strings: List[str]): + assert extract_domain_names(strings) == "website.com" + assert extract_domain_names(strings) == "website.comcomcomcomcomcomcomcomcomcom" + assert extract_domain_names(strings) == "2345kd-fkdgjfd.dsjfkj.web-site.gfdsa" + assert extract_domain_names(strings) == "g.o.o.g.l.e.com" + assert extract_domain_names(strings) == "foobar.co" + assert not extract_domain_names(strings) # 'foobar.c' + assert not extract_domain_names(strings) # 'g.o.o.g.l.3.com' diff --git a/tests/test_render.py b/tests/test_render.py index 60d62149e..32611fa9f 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -5,14 +5,20 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import gzip import textwrap +from typing import Optional +from pathlib import Path +from itertools import chain +import pytest # TODO (aaronatp): rewrite the tests to use only one of these frameworks import fixtures import capa.rules import capa.render.utils import capa.features.file import capa.features.insn +mport capa.render.default as default import capa.features.common import capa.features.freeze import capa.render.vverbose @@ -20,6 +26,11 @@ import capa.features.basicblock import capa.render.result_document import capa.features.freeze.features +from capa.render.utils import StringIO +from capa.features.extractors.cape.models import CapeReport + +CD = Path(__file__).resolve().parent +CAPE_DIR = CD / "data" / "dynamic" / "cape" def test_render_number(): @@ -190,3 +201,77 @@ def test_render_vverbose_feature(feature, expected): capa.render.vverbose.render_feature(ostream, layout, rm, matches, feature, indent=0) assert ostream.getvalue().strip() == expected + + +@pytest.mark.parametrize( + "rd_file", + [ + pytest.param("a3f3bbc_rd"), + pytest.param("al_khaserx86_rd"), + pytest.param("al_khaserx64_rd"), + pytest.param("a076114_rd"), + pytest.param("pma0101_rd"), + pytest.param("dotnet_1c444e_rd"), + ], +) +def test_render_domains(request, rd_file): + doc: result_document.ResultDocument = request.getfixturevalue(rd_file) + report: Optional[CapeReport] + ostream: StringIO + + render_output = default.render_domains(doc.strings, ostream) + + assert len(render_output) == len(render_output[0]) + if len(capa.capabilities.common.extract_file_names(*doc.sandbox_data, report)) > 0: + assert len(chain(render_output)) / len(render_output[0]) == 1 + else: + assert len(chain(render_output)) / len(render_output[0]) == 0 + + +@pytest.mark.parametrize( + "rd_file", + [ + pytest.param("a3f3bbc_rd"), + pytest.param("al_khaserx86_rd"), + pytest.param("al_khaserx64_rd"), + pytest.param("a076114_rd"), + pytest.param("pma0101_rd"), + pytest.param("dotnet_1c444e_rd"), + ], +) +def test_render_file_names(request, rd_file): + doc: result_document.ResultDocument = request.getfixturevalue(rd_file) + ostream: StringIO + + path = CAPE_DIR / "v2.4" / "8b9aaf4fad227cde7a7dabce7ba187b0b923301718d9d40de04bdd15c9b22905.json.gz" + buf = gzip.decompress(path.read_bytes()) + report = CapeReport.from_buf(buf) + + render_output = default.render_file_names(doc, report, ostream) + + assert len(render_output) == len(render_output[0]) + 1 # '+ 1' to account for the header + assert len(render_output) == len(render_output[1]) + 1 # '+ 1' also to account for the header + # to see how many "columns" there are in the output, we chain the rows and columns together + # and then divide the total number of elements by the number of rows + assert len(chain(render_output)) / len(render_output[0]) == 2 # + + +@pytest.mark.parametrize( + "rd_file", + [ + pytest.param("a3f3bbc_rd"), + pytest.param("al_khaserx86_rd"), + pytest.param("al_khaserx64_rd"), + pytest.param("a076114_rd"), + pytest.param("pma0101_rd"), + pytest.param("dotnet_1c444e_rd"), + ], +) +def test_render_ip_addresses(request, rd_file): + doc: result_document.ResultDocument = request.getfixturevalue(rd_file) + ostream: StringIO + + render_output = default.render_ip_addresses(doc.strings, ostream) + + assert len(render_output) == len(render_output[0]) + assert len(chain(render_output)) / len(render_output[0]) == 1