Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Santa ballot box bug #1086

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 291 additions & 26 deletions tests/santa/test_ballot_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from django.utils.crypto import get_random_string
from zentral.contrib.santa.ballot_box import (AnonymousVoter, BallotBox, DuplicateVoteError,
ResetNotAllowedError, Voter, VotingError, VotingNotAllowedError)
from zentral.contrib.santa.models import Rule, Target, TargetState
from zentral.contrib.santa.events import SantaBallotEvent, SantaRuleUpdateEvent, SantaTargetStateUpdateEvent
from zentral.contrib.santa.models import Ballot, Rule, Target, TargetState
from zentral.contrib.santa.utils import update_voting_rules
from .utils import (add_file_to_test_class, force_ballot, force_configuration, force_enrolled_machine,
force_realm_group, force_realm_user, force_target, force_voting_group)

Expand Down Expand Up @@ -650,6 +652,52 @@ def test_ballot_box_update_target_state_to_partially_allowlisted_to_globally_all
self.assertEqual(ts2.score, 3)
self.assertEqual(rule_qs.count(), 0)
ballot_box2.cast_votes([(configuration, True)])
ballot = Ballot.objects.get(target=self.file_target, realm_user=realm_user2)
vote = ballot.vote_set.first()
self.assertEqual(
ballot_box2._events,
[(SantaBallotEvent,
{'created_at': ballot.created_at,
'event_target': None,
'pk': str(ballot.pk),
'realm_user': {'pk': str(realm_user2.pk),
'realm': {'name': realm.name,
'pk': str(realm.pk)},
'username': realm_user2.username},
'replaced_by': None,
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'user_uid': realm_user2.username,
'votes': [{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': vote.created_at,
'pk': str(vote.pk),
'was_yes_vote': True,
'weight': 3}]}),
(SantaTargetStateUpdateEvent,
{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': ts2.created_at,
'new_value': {'flagged': False,
'reset_at': None,
'score': 6,
'state': 50,
'state_display': 'PARTIALLY_ALLOWLISTED'},
'prev_value': {'flagged': False,
'reset_at': None,
'score': 3,
'state': 0,
'state_display': 'UNTRUSTED'},
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'updated_at': ts2.updated_at}),
(SantaRuleUpdateEvent,
{'result': 'created',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'ALLOWLIST',
'primary_users': sorted([realm_user.username, realm_user2.username]),
'target': {'sha256': self.file_sha256,
'type': 'BINARY'}}})]
)
# third vote
_, realm_user3 = force_realm_user(realm=realm)
ballot_box3 = BallotBox.for_realm_user(self.file_target, realm_user3, all_configurations=True)
Expand All @@ -662,6 +710,56 @@ def test_ballot_box_update_target_state_to_partially_allowlisted_to_globally_all
self.assertEqual(rule.policy, Rule.Policy.ALLOWLIST)
self.assertEqual(set(rule.primary_users), {realm_user.username, realm_user2.username})
ballot_box3.cast_votes([(configuration, True)])
ts3.refresh_from_db()
ballot = Ballot.objects.get(target=self.file_target, realm_user=realm_user3)
vote = ballot.vote_set.first()
self.assertEqual(
ballot_box3._events,
[(SantaBallotEvent,
{'created_at': ballot.created_at,
'event_target': None,
'pk': str(ballot.pk),
'realm_user': {'pk': str(realm_user3.pk),
'realm': {'name': realm.name,
'pk': str(realm.pk)},
'username': realm_user3.username},
'replaced_by': None,
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'user_uid': realm_user3.username,
'votes': [{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': vote.created_at,
'pk': str(vote.pk),
'was_yes_vote': True,
'weight': 3}]}),
(SantaTargetStateUpdateEvent,
{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': ts3.created_at,
'new_value': {'flagged': False,
'reset_at': None,
'score': 9,
'state': 50,
'state_display': 'PARTIALLY_ALLOWLISTED'},
'prev_value': {'flagged': False,
'reset_at': None,
'score': 6,
'state': 50,
'state_display': 'PARTIALLY_ALLOWLISTED'},
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'updated_at': ts3.updated_at}),
(SantaRuleUpdateEvent,
{'result': 'updated',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'ALLOWLIST',
'primary_users': sorted([realm_user.username,
realm_user2.username,
realm_user3.username,]),
'target': {'sha256': self.file_sha256,
'type': 'BINARY'}},
'updates': {'added': {'primary_users': [realm_user3.username]}}})]
)
# fourth vote
_, realm_user4 = force_realm_user(realm=realm)
ballot_box4 = BallotBox.for_realm_user(self.file_target, realm_user4, all_configurations=True)
Expand All @@ -673,13 +771,72 @@ def test_ballot_box_update_target_state_to_partially_allowlisted_to_globally_all
self.assertEqual(rule, rule2)
self.assertEqual(rule2.policy, Rule.Policy.ALLOWLIST)
self.assertEqual(set(rule2.primary_users), {realm_user.username, realm_user2.username, realm_user3.username})
# Inconsistent stuff in the database before last vote
rule.refresh_from_db()
rule.custom_msg = "yolo"
rule.policy = Rule.Policy.BLOCKLIST
rule.save()
# last vote
ballot_box4.cast_votes([(configuration, True)])
ts4.refresh_from_db()
ballot = Ballot.objects.get(target=self.file_target, realm_user=realm_user4)
vote = ballot.vote_set.first()
self.assertEqual(
ballot_box4._events,
[(SantaBallotEvent,
{'created_at': ballot.created_at,
'event_target': None,
'pk': str(ballot.pk),
'realm_user': {'pk': str(realm_user4.pk),
'realm': {'name': realm.name,
'pk': str(realm.pk)},
'username': realm_user4.username},
'replaced_by': None,
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'user_uid': realm_user4.username,
'votes': [{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': vote.created_at,
'pk': str(vote.pk),
'was_yes_vote': True,
'weight': 3}]}),
(SantaTargetStateUpdateEvent,
{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': ts4.created_at,
'new_value': {'flagged': False,
'reset_at': None,
'score': 12,
'state': 100,
'state_display': 'GLOBALLY_ALLOWLISTED'},
'prev_value': {'flagged': False,
'reset_at': None,
'score': 9,
'state': 50,
'state_display': 'PARTIALLY_ALLOWLISTED'},
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'updated_at': ts4.updated_at}),
(SantaRuleUpdateEvent,
{'result': 'updated',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'ALLOWLIST',
'target': {'sha256': self.file_sha256,
'type': 'BINARY'}},
'updates': {'added': {'custom_msg': '', # Inconsistent state fix
'policy': Rule.Policy.ALLOWLIST}, # Inconsistent state fix
'removed': {'custom_msg': 'yolo', # Inconsistent state introduced in test
'policy': Rule.Policy.BLOCKLIST, # Inconsistent state introduced in test
'primary_users': sorted([realm_user.username,
realm_user2.username,
realm_user3.username,])}}})]
)
self.assertEqual(ts4.state, TargetState.State.GLOBALLY_ALLOWLISTED)
self.assertEqual(ts4.score, 12)
self.assertEqual(rule_qs.count(), 1)
rule3 = rule_qs.first()
self.assertEqual(rule, rule3)
self.assertEqual(rule2.custom_msg, "")
self.assertEqual(rule3.policy, Rule.Policy.ALLOWLIST)
self.assertEqual(len(rule3.primary_users), 0)

Expand Down Expand Up @@ -754,31 +911,6 @@ def test_ballot_box_update_target_state_to_banned(self):
self.assertEqual(rule.target, self.file_target)
self.assertEqual(rule.policy, Rule.Policy.BLOCKLIST)

def test_ballot_box_allowlist_bundle(self):
configuration = force_configuration()
rule_qs = configuration.rule_set.all()
self.assertEqual(rule_qs.count(), 0)
ballot_box = BallotBox.for_realm_user(self.bundle_target, None)
ballot_box._globally_allowlist(configuration)
self.assertEqual(rule_qs.count(), 1)
rule = rule_qs.first()
self.assertEqual(rule.target, self.file_target)
self.assertEqual(rule.policy, Rule.Policy.ALLOWLIST)
self.assertEqual(len(rule.primary_users), 0)

def test_ballot_box_allowlist_metabundle(self):
configuration = force_configuration()
rule_qs = configuration.rule_set.all()
self.assertEqual(rule_qs.count(), 0)
ballot_box = BallotBox.for_realm_user(self.metabundle_target, None)
ballot_box._globally_allowlist(configuration)
self.assertEqual(rule_qs.count(), 1)
rule = rule_qs.first()
self.assertEqual(rule.target.type, Target.Type.SIGNING_ID)
self.assertEqual(rule.target.identifier, self.file_signing_id)
self.assertEqual(rule.policy, Rule.Policy.ALLOWLIST)
self.assertEqual(len(rule.primary_users), 0)

# target state reset

def test_ballot_box_target_state_reset_not_allowed(self):
Expand Down Expand Up @@ -817,10 +949,143 @@ def test_ballot_box_target_state_reset(self):
self.assertIsNone(ts.reset_at)
ballot_box.reset_target_state(configuration)
ts.refresh_from_db()
self.assertEqual(
ballot_box._events,
[(SantaTargetStateUpdateEvent,
{'configuration': {'name': configuration.name, 'pk': configuration.pk},
'created_at': ts.created_at,
'new_value': {'flagged': False,
'reset_at': ts.reset_at,
'score': 0,
'state': 0,
'state_display': 'UNTRUSTED'},
'prev_value': {'flagged': True,
'reset_at': None,
'score': -100,
'state': -100,
'state_display': 'BANNED'},
'target': {'sha256': self.file_sha256,
'type': 'BINARY'},
'updated_at': ts.updated_at}),
(SantaRuleUpdateEvent,
{'result': 'deleted',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'BLOCKLIST',
'target': {'sha256': self.file_sha256,
'type': 'BINARY'}}})]
)
self.assertEqual(ts.state, TargetState.State.UNTRUSTED)
self.assertFalse(ts.flagged)
self.assertEqual(ts.score, 0)
self.assertIsNotNone(ts.reset_at)
self.assertEqual(rules_qs.count(), 0)
self.assertEqual(votes_qs.count(), 1)
self.assertEqual(votes_qs.first(), vote)

# update target states

def test_update_target_states(self):
realm, realm_user = force_realm_user()
configuration = force_configuration(voting_realm=realm)
force_ballot(
self.file_target, realm_user,
[(configuration, True, configuration.partially_allowlisted_threshold)]
)
target_state, _ = TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration,
state=TargetState.State.UNTRUSTED,
reset_at=datetime.utcnow()
)
configuration2 = force_configuration(voting_realm=realm)
# second target state in unrelated configurations must not interfere
TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration2,
score=configuration2.partially_allowlisted_threshold,
state=TargetState.State.PARTIALLY_ALLOWLISTED,
)
ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True)
ballot_box._update_target_states([(configuration, True)])
target_state.refresh_from_db()
self.assertEqual(target_state.score, 0)

# update voting rules

def test_update_voting_rules_remove_cdhash_voting_rule(self):
configuration = force_configuration()
Rule.objects.create(
configuration=configuration,
policy=Rule.Policy.BLOCKLIST,
target=self.cdhash_target,
is_voting_rule=True
)
self.assertEqual(configuration.rule_set.count(), 1)
event_payloads = list(update_voting_rules([configuration]))
self.assertEqual(
event_payloads,
[{'result': 'deleted',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'BLOCKLIST',
'target': {'cdhash': self.cdhash,
'type': 'CDHASH'}}}]
)
self.assertEqual(configuration.rule_set.count(), 0)

def test_update_voting_rules_remove_signing_id_voting_rule(self):
configuration = force_configuration()
Rule.objects.create(
configuration=configuration,
policy=Rule.Policy.BLOCKLIST,
target=self.signing_id_target,
is_voting_rule=True
)
self.assertEqual(configuration.rule_set.count(), 1)
event_payloads = list(update_voting_rules([configuration]))
self.assertEqual(
event_payloads,
[{'result': 'deleted',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'BLOCKLIST',
'target': {'signing_id': self.file_signing_id,
'type': 'SIGNINGID'}}}]
)
self.assertEqual(configuration.rule_set.count(), 0)

def test_update_voting_rules_remove_team_id_voting_rule(self):
configuration = force_configuration()
Rule.objects.create(
configuration=configuration,
policy=Rule.Policy.BLOCKLIST,
target=self.team_id_target,
is_voting_rule=True
)
self.assertEqual(configuration.rule_set.count(), 1)
event_payloads = list(update_voting_rules([configuration]))
self.assertEqual(
event_payloads,
[{'result': 'deleted',
'rule': {'configuration': {'name': configuration.name, 'pk': configuration.pk},
'is_voting_rule': True,
'policy': 'BLOCKLIST',
'target': {'team_id': self.file_team_id,
'type': 'TEAMID'}}}]
)
self.assertEqual(configuration.rule_set.count(), 0)

def test_update_voting_rules_keep_non_voting_rule(self):
configuration = force_configuration()
rule = Rule.objects.create(
configuration=configuration,
policy=Rule.Policy.BLOCKLIST,
target=self.team_id_target,
is_voting_rule=False
)
self.assertEqual(configuration.rule_set.count(), 1)
event_payloads = list(update_voting_rules([configuration]))
self.assertEqual(event_payloads, [])
self.assertEqual(configuration.rule_set.count(), 1)
self.assertEqual(configuration.rule_set.first(), rule)
Loading