Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugin validation framework #271

Merged
merged 7 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9, 3.10.5]
python-version: [3.7, 3.8, 3.9, 3.10.5, 3.11, 3.12.4]

steps:
- uses: actions/checkout@v3
Expand Down
5 changes: 3 additions & 2 deletions regipy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .registry import *
__title__ = 'regipy'
__version__ = '4.3.0'

__title__ = "regipy"
__version__ = "4.3.1"
364 changes: 273 additions & 91 deletions regipy/cli.py

Large diffs are not rendered by default.

42 changes: 31 additions & 11 deletions regipy/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@

logger = logging.getLogger(__name__)

def get_filtered_subkeys(registry_hive: RegistryHive, name_key_entry: NKRecord, start_date: str = None, end_date: str = None, verbose=False, fetch_values=True) -> Iterator[NKRecord]:

def get_filtered_subkeys(
registry_hive: RegistryHive,
name_key_entry: NKRecord,
start_date: str = None,
end_date: str = None,
verbose=False,
fetch_values=True,
) -> Iterator[NKRecord]:
"""
Get records filtered by the specified timestamps
:param registry_hive: A RegistryHive object
Expand All @@ -23,30 +31,42 @@ def get_filtered_subkeys(registry_hive: RegistryHive, name_key_entry: NKRecord,
skipped_entries_count = 0
if start_date:
start_date = pytz.utc.localize(dt.datetime.fromisoformat(start_date))

if end_date:
end_date = pytz.utc.localize(dt.datetime.fromisoformat(end_date))


with progressbar(registry_hive.recurse_subkeys(name_key_entry, fetch_values=False)) as reg_subkeys:
for subkey_count, subkey in enumerate(reg_subkeys):
with progressbar(
registry_hive.recurse_subkeys(name_key_entry, fetch_values=False)
) as reg_subkeys:
for subkey_count, subkey in enumerate(reg_subkeys):
if start_date:
if subkey.timestamp < start_date:
skipped_entries_count += 1
logger.debug(f"Skipping entry {subkey} which has a timestamp prior to start_date")
logger.debug(
f"Skipping entry {subkey} which has a timestamp prior to start_date"
)
continue

if end_date:
if subkey.timestamp > end_date:
skipped_entries_count += 1
logger.debug(f"Skipping entry {subkey} which has a timestamp after the end_date")
logger.debug(
f"Skipping entry {subkey} which has a timestamp after the end_date"
)
continue

nk = registry_hive.get_key(subkey.path)
yield Subkey(subkey_name=subkey.subkey_name, path=subkey.path,
timestamp=subkey.timestamp, values=list(nk.iter_values(as_json=True)) if fetch_values else [],
values_count=subkey.values_count)
logger.info(f"{skipped_entries_count} out of {subkey_count} subkeys were filtered out due to timestamp constrains")
yield Subkey(
subkey_name=subkey.subkey_name,
path=subkey.path,
timestamp=subkey.timestamp,
values=list(nk.iter_values(as_json=True)) if fetch_values else [],
values_count=subkey.values_count,
)
logger.info(
f"{skipped_entries_count} out of {subkey_count} subkeys were filtered out due to timestamp constrains"
)


def _normalize_subkey_fields(field) -> str:
if isinstance(field, bytes):
Expand Down
2 changes: 1 addition & 1 deletion regipy/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,4 @@
"0bbca823-e77d-419e-9a44-5adec2c8eeb0": "NVIDIA Control Panel",
"8e0c279d-0bd1-43c3-9ebd-31c3dc5b8a77": "Windows To Go",
"00028b00-0000-0000-c000-000000000046": "Microsoft Network",
}
}
5 changes: 5 additions & 0 deletions regipy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ class RegipyException(Exception):
"""
This is the parent exception for all regipy exceptions
"""

pass


class RegipyGeneralException(RegipyException):
"""
General exception
"""

pass


Expand Down Expand Up @@ -39,8 +42,10 @@ class RegistryParsingException(RegipyException):
"""
Raised when there is a parsing error, most probably a corrupted hive
"""

pass


class NtSidDecodingException(RegipyException):
"""
Raised when the binary Windows NT SID representation can not be decoded
Expand Down
24 changes: 12 additions & 12 deletions regipy/hive_types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
NTUSER_HIVE_TYPE = 'ntuser'
SYSTEM_HIVE_TYPE = 'system'
AMCACHE_HIVE_TYPE = 'amcache'
SOFTWARE_HIVE_TYPE = 'software'
SAM_HIVE_TYPE = 'sam'
SECURITY_HIVE_TYPE = 'security'
CLASSES_ROOT_HIVE_TYPE = 'classes_root'
BCD_HIVE_TYPE = 'bcd'
USRCLASS_HIVE_TYPE = 'usrclass'
NTUSER_HIVE_TYPE = "ntuser"
SYSTEM_HIVE_TYPE = "system"
AMCACHE_HIVE_TYPE = "amcache"
SOFTWARE_HIVE_TYPE = "software"
SAM_HIVE_TYPE = "sam"
SECURITY_HIVE_TYPE = "security"
CLASSES_ROOT_HIVE_TYPE = "classes_root"
BCD_HIVE_TYPE = "bcd"
USRCLASS_HIVE_TYPE = "usrclass"


SUPPORTED_HIVE_TYPES = [
Expand All @@ -17,9 +17,9 @@
SAM_HIVE_TYPE,
SECURITY_HIVE_TYPE,
BCD_HIVE_TYPE,
USRCLASS_HIVE_TYPE
USRCLASS_HIVE_TYPE,
]


HVLE_TRANSACTION_LOG_MAGIC = b'HvLE'
DIRT_TRANSACTION_LOG_MAGIC = b'DIRT'
HVLE_TRANSACTION_LOG_MAGIC = b"HvLE"
DIRT_TRANSACTION_LOG_MAGIC = b"DIRT"
113 changes: 64 additions & 49 deletions regipy/plugins/amcache/amcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,87 @@
logger = logging.getLogger(__name__)

AMCACHE_FIELD_NUMERIC_MAPPINGS = {
'0': 'product_name',
'1': 'company_name',
'2': 'file_version_number',
'3': 'language_code',
'4': 'switchback_context',
'5': 'file_version',
'6': 'file_size',
'7': 'pe_header_hash',
'8': 'unknown1',
'9': 'pe_header_checksum',
'a': 'unknown2',
'b': 'unknown3',
'c': 'file_description',
'd': 'unknown4',
'f': 'linker_compile_time',
'10': 'unknown5',
'11': 'last_modified_timestamp',
'12': 'created_timestamp',
'15': 'full_path',
'16': 'unknown6',
'17': 'last_modified_timestamp_2',
'100': 'program_id',
'101': 'sha1'
"0": "product_name",
"1": "company_name",
"2": "file_version_number",
"3": "language_code",
"4": "switchback_context",
"5": "file_version",
"6": "file_size",
"7": "pe_header_hash",
"8": "unknown1",
"9": "pe_header_checksum",
"a": "unknown2",
"b": "unknown3",
"c": "file_description",
"d": "unknown4",
"f": "linker_compile_time",
"10": "unknown5",
"11": "last_modified_timestamp",
"12": "created_timestamp",
"15": "full_path",
"16": "unknown6",
"17": "last_modified_timestamp_2",
"100": "program_id",
"101": "sha1",
}

WIN8_TS_FIELDS = ['last_modified_timestamp', 'created_timestamp', 'last_modified_timestamp_2']
WIN8_TS_FIELDS = [
"last_modified_timestamp",
"created_timestamp",
"last_modified_timestamp_2",
]


class AmCachePlugin(Plugin):
NAME = 'amcache'
DESCRIPTION = 'Parse Amcache'
NAME = "amcache"
DESCRIPTION = "Parse Amcache"
COMPATIBLE_HIVE = AMCACHE_HIVE_TYPE

def parse_amcache_file_entry(self, subkey):
entry = {underscore(x.name): x.value for x in subkey.iter_values(as_json=self.as_json)}
entry = {
underscore(x.name): x.value
for x in subkey.iter_values(as_json=self.as_json)
}

# Sometimes the value names might be numeric instead. Translate them:
for k, v in AMCACHE_FIELD_NUMERIC_MAPPINGS.items():
content = entry.pop(k, None)
if content:
entry[v] = content

if 'sha1' in entry:
entry['sha1'] = entry['sha1'][4:]
if "sha1" in entry:
entry["sha1"] = entry["sha1"][4:]

if 'file_id' in entry and entry['file_id'] != 0:
entry['file_id'] = entry['file_id'][4:]
if 'sha1' not in entry:
entry['sha1'] = entry['file_id']
if "file_id" in entry and entry["file_id"] != 0:
entry["file_id"] = entry["file_id"][4:]
if "sha1" not in entry:
entry["sha1"] = entry["file_id"]

if 'program_id' in entry:
entry['program_id'] = entry['program_id'][4:]
if "program_id" in entry:
entry["program_id"] = entry["program_id"][4:]

entry['timestamp'] = convert_wintime(subkey.header.last_modified, as_json=self.as_json)
entry["timestamp"] = convert_wintime(
subkey.header.last_modified, as_json=self.as_json
)

if 'size' in entry:
entry['size'] = int(entry['size'], 16) if isinstance(entry['size'], str) else entry['size']
if "size" in entry:
entry["size"] = (
int(entry["size"], 16)
if isinstance(entry["size"], str)
else entry["size"]
)

is_pefile = entry.get('is_pe_file')
is_pefile = entry.get("is_pe_file")
if is_pefile is not None:
entry['is_pe_file'] = bool(is_pefile)
entry["is_pe_file"] = bool(is_pefile)

is_os_component = entry.get('is_os_component')
is_os_component = entry.get("is_os_component")
if is_os_component is not None:
entry['is_os_component'] = bool(is_os_component)
entry["is_os_component"] = bool(is_os_component)

if entry.get('link_date') == 0:
entry.pop('link_date')
if entry.get("link_date") == 0:
entry.pop("link_date")

for ts_field_name in WIN8_TS_FIELDS:
ts = entry.pop(ts_field_name, None)
Expand All @@ -87,18 +100,20 @@ def parse_amcache_file_entry(self, subkey):
self.entries.append(entry)

def run(self):
logger.debug('Started AmCache Plugin...')
logger.debug("Started AmCache Plugin...")

try:
amcache_file_subkey = self.registry_hive.get_key(r'\Root\File')
amcache_file_subkey = self.registry_hive.get_key(r"\Root\File")
except RegistryKeyNotFoundException:
logger.error(r'Could not find \Root\File subkey')
logger.error(r"Could not find \Root\File subkey")
amcache_file_subkey = None

try:
amcache_inventory_file_subkey = self.registry_hive.get_key(r'\Root\InventoryApplicationFile')
amcache_inventory_file_subkey = self.registry_hive.get_key(
r"\Root\InventoryApplicationFile"
)
except RegistryKeyNotFoundException:
logger.info(r'Could not find \Root\InventoryApplicationFile subkey')
logger.info(r"Could not find \Root\InventoryApplicationFile subkey")
amcache_inventory_file_subkey = None

if amcache_file_subkey:
Expand Down
30 changes: 18 additions & 12 deletions regipy/plugins/ntuser/classes_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@

logger = logging.getLogger(__name__)

CLASSES_INSTALLER_PATH = r'\Software\Microsoft\Installer\Products'
CLASSES_INSTALLER_PATH = r"\Software\Microsoft\Installer\Products"


class NtuserClassesInstallerPlugin(Plugin):
NAME = 'ntuser_classes_installer'
DESCRIPTION = 'List of installed software from NTUSER hive'
NAME = "ntuser_classes_installer"
DESCRIPTION = "List of installed software from NTUSER hive"
COMPATIBLE_HIVE = NTUSER_HIVE_TYPE

def run(self):
try:
classes_installer_subkey = self.registry_hive.get_key(CLASSES_INSTALLER_PATH)
classes_installer_subkey = self.registry_hive.get_key(
CLASSES_INSTALLER_PATH
)
except RegistryKeyNotFoundException as ex:
logger.error(ex)
return

for entry in classes_installer_subkey.iter_subkeys():
identifier = entry.name
timestamp = convert_wintime(entry.header.last_modified, as_json=self.as_json)
product_name = entry.get_value('ProductName')
self.entries.append({
'identifier': identifier,
'timestamp': timestamp,
'product_name': product_name,
'is_hidden': product_name is None
})
timestamp = convert_wintime(
entry.header.last_modified, as_json=self.as_json
)
product_name = entry.get_value("ProductName")
self.entries.append(
{
"identifier": identifier,
"timestamp": timestamp,
"product_name": product_name,
"is_hidden": product_name is None,
}
)
Loading
Loading