Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output IP addresses, domain names, file manipulations, and (potentially) registry details #1914

97 changes: 95 additions & 2 deletions capa/capabilities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,26 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import re
import logging
import itertools
import collections
from typing import Any, Tuple
from typing import Any, List, Tuple, Iterator, Optional

import capa.features.extractors.cape.file as cape_file
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
from capa.features.extractors.cape.models import Call, CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
ThreadHandle,
ProcessHandle,
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -77,3 +88,85 @@ def find_capabilities(
return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs)

raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}")


def extract_ip_addresses(strings: List[str]) -> Iterator[str]:
""" yield (IPv4 and IPv6) IP address regex matches from list of strings """
# Both the IPv4 and IPv6 regex patterns are discussed here:
# (https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses)
ipv4_pattern = r"""
((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|
(2[0-4]|1{0,1}[0-9]){0,1}[0-9])
"""

ipv6_pattern = r"""
(
([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|
([0-9a-fA-F]{1,4}:){1,7}:|
([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|
([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|
([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|
([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|
([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|
[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|
:((:[0-9a-fA-F]{1,4}){1,7}|:)|
fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|
::(ffff(:0{1,4}){0,1}:){0,1}
((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}
(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|
([0-9a-fA-F]{1,4}:){1,4}:
((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}
(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])
)
"""

for string in strings:
if re.search(ipv4_pattern, string):
yield string

if re.search(ipv6_pattern, string):
yield string


def extract_domain_names(strings: List[str]) -> Iterator[str]:
""" yield web domain regex matches from list of strings """
# See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790)
domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$"
for string in strings:
if re.search(domain_pattern, string):
yield string


def extract_file_names(
process_handles: Iterator[ProcessHandle],
thread_handles: Iterator[ThreadHandle],
call_handles: Iterator[CallHandle],
report: Optional[CapeReport],
):
"""
extracts Windows API file maniuplation functions that processes import
yields: 1) API name, and 2) file that it iteracts with

'default.render_file_names' checks whether 'report' is None before calling 'extract_file_name'

yield:
ch.api (str): the API that interacts with the filename
call.arguments[0].name (str): a filename, which is a parameter of some WinAPI file interaction functions
"""
# Extract many Windows API functions that take a filename as an argument
winapi_file_functions = []
for feature, _ in cape_file.extract_import_names(report):
assert type(feature.value) == "str" # feature.value type annotation is: 'value: Union[str, int, float, bytes]'
if feature.value.str.contains("File"): # a lot of Windows API file interaction function names contain "File"
winapi_file_functions.append(feature[0])

for ph in process_handles:
for th in thread_handles:
for ch in call_handles:
call: Call = ch.inner
if call.api in winapi_file_functions:
# winapi_file_functions functions take file name as their first variable
# since calling conventions commonly store function parameters on the stack in reverse order,
# we yield the file name with call.arguments[-1].name
# although should we use call.arguments[0].name to get file names for different calling conventions?
yield call.api, call.arguments[-1].name
22 changes: 15 additions & 7 deletions capa/ghidra/capa_ghidra.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def run_headless():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()

strings = None
sandbox_data = None
report = None

capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False)

meta.analysis.feature_counts = counts["feature_counts"]
Expand All @@ -84,13 +88,13 @@ def run_headless():
logger.info("capa encountered warnings during analysis")

if args.json:
print(capa.render.json.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.json.render(meta, rules, capabilities, strings, sandbox_data)) # noqa: T201
elif args.vverbose:
print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201
elif args.verbose:
print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201
else:
print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201

return 0

Expand Down Expand Up @@ -124,6 +128,10 @@ def run_ui():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()

strings = None
sandbox_data = None
report = None

capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True)

meta.analysis.feature_counts = counts["feature_counts"]
Expand All @@ -134,11 +142,11 @@ def run_ui():
logger.info("capa encountered warnings during analysis")

if verbose == "vverbose":
print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201
elif verbose == "verbose":
print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201
else:
print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report)) # noqa: T201

return 0

Expand Down
9 changes: 8 additions & 1 deletion capa/ida/plugin/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,16 @@ def slot_progress_feature_extraction(text):

update_wait_box("collecting results")

strings = None
sandbox_data = None

try:
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
meta, ruleset, capabilities
meta,
ruleset,
capabilities,
strings,
sandbox_data,
)
except Exception as e:
logger.exception("Failed to collect results (error: %s)", e)
Expand Down
34 changes: 27 additions & 7 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import textwrap
import contextlib
from types import TracebackType
from typing import Any, Set, Dict, List, Callable, Optional
from typing import Any, Set, Dict, List, Tuple, Callable, Iterator, Optional
from pathlib import Path

import halo
Expand Down Expand Up @@ -84,7 +84,11 @@
)
from capa.features.address import Address
from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities
from capa.features.extractors.cape.models import CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
ThreadHandle,
ProcessHandle,
SampleHashes,
FeatureExtractor,
StaticFeatureExtractor,
Expand Down Expand Up @@ -1226,14 +1230,22 @@ def main(argv: Optional[List[str]] = None):
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return E_FILE_LIMITATION

sandbox_data = Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]]
report = json.load(Path(args.sample).open(encoding="utf-8"))

try:
strings = report.static.pe.imports
except AttributeError:
strings = None
if args.json:
print(capa.render.json.render(meta, rules, capabilities))
print(capa.render.json.render(meta, rules, capabilities, strings, sandbox_data))
elif args.vverbose:
print(capa.render.vverbose.render(meta, rules, capabilities))
print(capa.render.vverbose.render(meta, rules, capabilities, strings, sandbox_data, report))
elif args.verbose:
print(capa.render.verbose.render(meta, rules, capabilities))
print(capa.render.verbose.render(meta, rules, capabilities, strings, sandbox_data, report))
else:
print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report))
colorama.deinit()

logger.debug("done.")
Expand Down Expand Up @@ -1271,14 +1283,18 @@ def ida_main():

capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())

report = None
strings = None
sandbox_data = None

meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]

if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

colorama.init(strip=True)
print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report))


def ghidra_main():
Expand Down Expand Up @@ -1309,13 +1325,17 @@ def ghidra_main():
not capa.ghidra.helpers.is_running_headless(),
)

report = None
strings = None
sandbox_data = None

meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]

if has_file_limitation(rules, capabilities, is_standalone=False):
logger.info("capa encountered warnings during analysis")

print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities, strings, sandbox_data, report))


if __name__ == "__main__":
Expand Down
77 changes: 73 additions & 4 deletions capa/render/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
# See the License for the specific language governing permissions and limitations under the License.

import collections
from typing import List, Tuple, Iterator, Optional

import tabulate

import capa.render.utils as rutils
import capa.capabilities.common as common
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import StringIO
from capa.features.extractors.cape.models import CapeReport
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle

tabulate.PRESERVE_WHITESPACE = True

Expand Down Expand Up @@ -197,7 +201,58 @@ def render_mbc(doc: rd.ResultDocument, ostream: StringIO):
ostream.write("\n")


def render_default(doc: rd.ResultDocument):
def render_ip_addresses(doc: rd.ResultDocument, ostream: StringIO):
if doc.strings is not None:
rows = []
for ip_addr in common.extract_ip_addresses(doc.strings):
rows.append(rutils.bold(ip_addr.lower())) # lowercase IPv6 letters

if rows:
ostream.write(
tabulate.tabulate(
rows,
headers=[width("Possible IP Addresses", max(len(ip_addr) for ip_addr in rows) + 1)],
tablefmt="mixed_grid",
)
)
ostream.write("\n")


def render_domains(doc: rd.ResultDocument, ostream: StringIO):
if doc.strings is not None:
rows = []
for domain in common.extract_domain_names(doc.strings):
rows.append(rutils.bold(domain))

if rows:
ostream.write(
tabulate.tabulate(
rows,
headers=[width("Web Domains", max(len(domain) for domain in rows) + 1)],
tablefmt="mixed_grid",
)
)
ostream.write("\n")


def render_file_names(doc: rd.ResultDocument, report: Optional[CapeReport], ostream: StringIO):
if doc.sandbox_data is not None and report is not None:
rows: List = []
for api, file_name in common.extract_file_names(*doc.sandbox_data, report):
rows.append([rutils.bold(api), rutils.bold(file_name)])

if rows:
ostream.write(
tabulate.tabulate(
rows,
headers=[width("APIs", 25), width("File names", 75)],
tablefmt="mixed_grid",
)
)
ostream.write("\n")


def render_default(doc: rd.ResultDocument, report: Optional[CapeReport]):
ostream = rutils.StringIO()

render_meta(doc, ostream)
Expand All @@ -207,10 +262,24 @@ def render_default(doc: rd.ResultDocument):
render_mbc(doc, ostream)
ostream.write("\n")
render_capabilities(doc, ostream)
ostream.write("\n")
# the following functions perform ostream.write("\n") conditionally under the hood
# doc.strings functions under the hood
render_ip_addresses(doc, ostream)
render_domains(doc, ostream)
# *doc.sandbox_data under the hood
render_file_names(doc, report, ostream)

return ostream.getvalue()


def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
return render_default(doc)
def render(
meta,
rules: RuleSet,
capabilities: MatchResults,
strings: Optional[list[str]],
sandbox_data: Optional[Tuple[Iterator[ProcessHandle], Iterator[ThreadHandle], Iterator[CallHandle]]],
report: Optional[CapeReport],
) -> str:
doc = rd.ResultDocument.from_capa(meta, rules, capabilities, strings, sandbox_data)
return render_default(doc, report)
Loading
Loading