Skip to content

Commit

Permalink
Rework Santa prometheus metrics
Browse files Browse the repository at this point in the history
 * Add configurations info gauge for extra labels
 * Use `cfg_pk` label to reference the configuration in the other
   metrics
 * Add targets gauge
  • Loading branch information
np5 committed Jun 18, 2024
1 parent 94707ef commit 9722054
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 148 deletions.
2 changes: 1 addition & 1 deletion tests/santa/test_api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from zentral.contrib.santa.models import Configuration, Rule, RuleSet, Target, Enrollment, Bundle
from zentral.core.events.base import AuditEvent
from zentral.utils.payloads import get_payload_identifier
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


class APIViewsTestCase(TestCase):
Expand Down
219 changes: 163 additions & 56 deletions tests/santa/test_metrics_views.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
import uuid
from unittest.mock import call, patch
from django.urls import reverse
from django.test import TestCase
from django.utils.crypto import get_random_string
from prometheus_client.parser import text_string_to_metric_families
from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit
from zentral.contrib.santa.models import Configuration, EnrolledMachine, Enrollment, Rule, Target
from zentral.contrib.santa.models import Rule, Target, translate_rule_policy
from zentral.conf import settings
from .test_rule_engine import new_cdhash, new_sha256, new_team_id
from .utils import force_configuration, force_enrolled_machine, force_rule, force_target_counter


class SantaMetricsViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.configuration = Configuration.objects.create(name=get_random_string(256))
cls.meta_business_unit = MetaBusinessUnit.objects.create(name=get_random_string(64))
cls.enrollment_secret = EnrollmentSecret.objects.create(meta_business_unit=cls.meta_business_unit)
cls.enrollment = Enrollment.objects.create(configuration=cls.configuration,
secret=cls.enrollment_secret)
cls.machine_serial_number = get_random_string(64)
cls.enrolled_machine = EnrolledMachine.objects.create(enrollment=cls.enrollment,
hardware_uuid=uuid.uuid4(),
serial_number=cls.machine_serial_number,
client_mode=Configuration.MONITOR_MODE,
santa_version="2021.7")

# utility methods

def _force_configuration(self):
return Configuration.objects.create(name=get_random_string(12))

def _make_authenticated_request(self):
return self.client.get(reverse("santa_metrics:all"),
HTTP_AUTHORIZATION=f'Bearer {settings["api"]["metrics_bearer_token"]}')
Expand All @@ -44,69 +25,195 @@ def test_metrics_permission_ok(self):
self.assertEqual(response.status_code, 200)

def test_configurations(self):
config_m = force_configuration(lockdown=False)
config_l = force_configuration(lockdown=True)
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_configurations":
if family.name != "zentral_santa_configurations_info":
continue
else:
self.assertEqual(len(family.samples), 1)
sample = family.samples[0]
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["mode"], "monitor")
self.assertEqual(len(family.samples), 2)
for sample in family.samples:
self.assertEqual(sample.value, 1)
if int(sample.labels["pk"]) == config_m.pk:
self.assertEqual(sample.labels["mode"], "MONITOR")
self.assertEqual(sample.labels["name"], config_m.name)
elif int(sample.labels["pk"]) == config_l.pk:
self.assertEqual(sample.labels["mode"], "LOCKDOWN")
self.assertEqual(sample.labels["name"], config_l.name)
else:
raise AssertionError("Unknown config")
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.warning")
def test_configurations_info_unknown_mode(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[(1, "yolo", 42)], # 1st call with bad mode
[], # 2nd call for the enrolled machines gauge
[], # 3rd call for the rules gauge
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_configurations_info":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown santa configuration mode: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_enrolled_machines(self):
em_m = force_enrolled_machine(lockdown=False, santa_version="2024.5")
em_l = force_enrolled_machine(lockdown=True, santa_version="2024.6")
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_enrolled_machines":
if family.name != "zentral_santa_enrolled_machines_total":
continue
else:
self.assertEqual(len(family.samples), 1)
sample = family.samples[0]
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["configuration"], self.configuration.name)
self.assertEqual(sample.labels["mode"], "monitor")
self.assertEqual(sample.labels["santa_version"], self.enrolled_machine.santa_version)
self.assertEqual(len(family.samples), 2)
for sample in family.samples:
self.assertEqual(sample.value, 1)
cfg_pk = int(sample.labels["cfg_pk"])
if cfg_pk == em_m.enrollment.configuration.pk:
self.assertEqual(sample.labels["mode"], "MONITOR")
self.assertEqual(sample.labels["santa_version"], "2024.5")
elif cfg_pk == em_l.enrollment.configuration.pk:
self.assertEqual(sample.labels["mode"], "LOCKDOWN")
self.assertEqual(sample.labels["santa_version"], "2024.6")
else:
raise AssertionError("Unknown enrolled machine")
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.warning")
def test_enrolled_machines_unknown_mode(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[], # 1st call for the configurations info gauge
[(1, 42, "2024.5", 1)], # 2nd call for the enrolled machines gauge
[], # 3rd call for the rules gauge
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_enrolled_machines_total":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown santa configuration mode: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_rules(self):
for target_type in (Target.BINARY,
Target.BUNDLE,
Target.CDHASH,
Target.CERTIFICATE,
Target.TEAM_ID,
Target.SIGNING_ID):
if target_type == Target.CDHASH:
identifier = new_cdhash()
if target_type == Target.TEAM_ID:
identifier = new_team_id()
elif target_type == Target.SIGNING_ID:
identifier = "platform:com.apple.curl"
else:
identifier = new_sha256()
target = Target.objects.create(type=target_type, identifier=identifier)
Rule.objects.create(configuration=self.configuration, target=target, policy=Rule.BLOCKLIST)
rules = {}
for target_type, policy in (
(Target.BINARY, Rule.ALLOWLIST),
(Target.BUNDLE, Rule.BLOCKLIST),
(Target.CDHASH, Rule.ALLOWLIST_COMPILER),
(Target.CERTIFICATE, Rule.SILENT_BLOCKLIST),
(Target.TEAM_ID, Rule.ALLOWLIST),
(Target.SIGNING_ID, Rule.BLOCKLIST),
):
rule = force_rule(target_type=target_type, policy=policy)
rules[str(rule.configuration.pk)] = rule
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_rules":
if family.name != "zentral_santa_rules_total":
continue
else:
self.assertEqual(len(family.samples), 6)
target_type_set = set()
for sample in family.samples:
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["configuration"], self.configuration.name)
self.assertEqual(sample.labels["ruleset"], "_")
self.assertEqual(sample.labels["policy"], "blocklist")
target_type_set.add(sample.labels["target_type"])
self.assertEqual(target_type_set, {"binary", "bundle", "cdhash", "certificate", "teamid", "signingid"})
rule = rules[sample.labels["cfg_pk"]]
self.assertEqual(sample.labels["policy"], translate_rule_policy(rule.policy))
self.assertEqual(sample.labels["target_type"], rule.target.type)
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.error")
def test_rules_unknown_policy(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[], # 1st call for the configurations info
[], # 2nd call for the enrolled machines gauge
[(1, None, "BUNDLE", 42, 1)], # 3rd call with unknown policy
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_rules_total":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown rule policy: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_targets(self):
target_counters = {}
for target_type, blocked_count, collected_count, executed_count, is_rule in (
(Target.BINARY, 11, 0, 0, True),
(Target.BUNDLE, 11, 22, 0, False),
(Target.CDHASH, 11, 22, 33, False),
(Target.CERTIFICATE, 1, 0, 0, False),
(Target.TEAM_ID, 1, 2, 0, False),
(Target.SIGNING_ID, 1, 2, 3, True),
):
target_counter = force_target_counter(
target_type,
blocked_count=blocked_count,
collected_count=collected_count,
executed_count=executed_count,
is_rule=is_rule,
)
target_counters.setdefault(str(target_counter.configuration.pk), {})[target_counter.target.type] = {
"total": 1,
"blocked_total": blocked_count,
"collected_total": collected_count,
"executed_total": executed_count,
"rules_total": 1 if is_rule else 0
}
response = self._make_authenticated_request()
family_count = 0
total_keys = set()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if not family.name.startswith("zentral_santa_targets_"):
continue
family_count += 1
total_key = family.name.removeprefix("zentral_santa_targets_")
total_keys.add(total_key)
sample_count = 0
for sample in family.samples:
sample_count += 1
self.assertEqual(
sample.value,
target_counters[sample.labels["cfg_pk"]][sample.labels["type"]][total_key]
)
self.assertEqual(sample_count, 6)
self.assertEqual(family_count, 5)
self.assertEqual(
total_keys,
{"total", "blocked_total", "collected_total", "executed_total", "rules_total"}
)
17 changes: 1 addition & 16 deletions tests/santa/test_rule_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,7 @@
from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment,
MachineRule, Rule, Target, translate_rule_policy)
from zentral.contrib.santa.forms import test_cdhash, test_signing_id_identifier


def new_cdhash():
return get_random_string(length=40, allowed_chars='abcdef0123456789')


def new_sha256():
return get_random_string(length=64, allowed_chars='abcdef0123456789')


def new_team_id():
return get_random_string(10, allowed_chars="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")


def new_signing_id_identifier():
return ":".join((new_team_id(), get_random_string(10, allowed_chars="abcdefghij")))
from .utils import new_cdhash, new_sha256, new_team_id, new_signing_id_identifier


class SantaRuleEngineTestCase(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_santa_api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment,
MachineRule, Rule, Target, TargetCounter)
from zentral.core.incidents.models import Severity
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_santa_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
SantaEnrollmentEvent, SantaEventEvent,
SantaRuleSetUpdateEvent, SantaRuleUpdateEvent)
from zentral.contrib.santa.models import Bundle, Configuration, Target
from .test_rule_engine import new_sha256
from .utils import new_sha256


class SantaEventTestCase(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_setup_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit, File, Tag
from zentral.contrib.santa.models import Bundle, Configuration, Enrollment, Rule, Target
from zentral.core.events.base import AuditEvent
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_targets_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from accounts.models import User
from zentral.contrib.santa.models import Bundle, Configuration, Target
from zentral.core.stores.conf import frontend_store
from .test_rule_engine import new_cdhash, new_sha256, new_team_id
from .utils import new_cdhash, new_sha256, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
Loading

0 comments on commit 9722054

Please sign in to comment.