Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation of the policy document parser #3246

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions azurelinuxagent/ga/policy/policy_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def _parse_policy_version(policy):
"""
version = _PolicyEngine._get_string(policy, attribute="policyVersion")

if not re.match(r"^\d+\.\d+\.\d+$", version):
raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major.minor.patch' (e.g., '1.0.0')".format(version))
if not re.match(r"^\d+(\.\d+(\.\d+)?)?$", version):
raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major[.minor[.patch]]' (e.g., '1', '1.0', '1.0.0')".format(version))

if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version):
raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION))
Expand All @@ -173,7 +173,7 @@ def _parse_extension_policies(policy):
"allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY),
"signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED),
"extensions": _PolicyEngine._parse_extensions(
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS))
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_EXTENSIONS)
)
}

Expand All @@ -187,7 +187,7 @@ def _parse_extensions(extensions):

for extension, extension_policy in extensions.items():
if not isinstance(extension_policy, dict):
raise InvalidPolicyError("invalid type {0} for attribute '{1}'; must be 'object'".format(type(extension_policy).__name__, extension))
raise InvalidPolicyError("invalid type {0} for attribute 'extensionPolicies.extensions.{1}'; must be 'object'".format(type(extension_policy).__name__, extension))
parsed[extension] = _PolicyEngine._parse_extension(extension_policy)

return parsed
Expand Down Expand Up @@ -223,13 +223,14 @@ def _check_attributes(object_, object_name, valid_attributes):
"""
for k in object_.keys():
if k not in valid_attributes:
raise InvalidPolicyError("invalid attribute '{0}' in {1}".format(k, object_name))
raise InvalidPolicyError("unrecognized attribute '{0}' in {1}".format(k, object_name))

@staticmethod
def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a dictionary, else returns default.
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
Returns object[attribute] if it exists, verifying that it is a dictionary.
If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError.
If object_[attribute] is not a dictionary, raises InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default)
Expand All @@ -238,7 +239,8 @@ def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=
def _get_string(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a string, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError.
If object_[attribute] is not a string, raises InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default)
Expand All @@ -247,7 +249,8 @@ def _get_string(object_, attribute, name_prefix="", optional=False, default=None
def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a boolean, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError.
If object_[attribute] is not a boolean, raises InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default)
Expand All @@ -256,7 +259,8 @@ def _get_boolean(object_, attribute, name_prefix="", optional=False, default=Non
def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default):
"""
Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError.
If the type of object_[attribute] is not 'type_', raises InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message.
"""
if default is not None and not optional:
Expand All @@ -273,7 +277,7 @@ def _get_value(object_, attribute, name_prefix, type_, type_name, optional, defa

class ExtensionPolicyEngine(_PolicyEngine):

def should_allow_extension(self, extension_to_check):
def should_allow_extension(self, extension_name):
"""
Return whether we should allow extension download based on policy.
extension_to_check is expected to be an Extension object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer applicable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; updated

Expand All @@ -288,24 +292,21 @@ def should_allow_extension(self, extension_to_check):
allow_listed_extension_only = self._policy.get("extensionPolicies").get("allowListedExtensionsOnly")
extension_allowlist = self._policy.get("extensionPolicies").get("extensions")

should_allow = not allow_listed_extension_only or extension_allowlist.get(extension_to_check.name) is not None
should_allow = not allow_listed_extension_only or extension_allowlist.get(extension_name) is not None
return should_allow

def should_enforce_signature_validation(self, extension_to_check):
def should_enforce_signature_validation(self, extension_name):
"""
Return whether we should enforce signature based on policy.
extension_to_check is expected to be an Extension object.

If policy feature not enabled, return False.
Individual policy takes precedence over global - if individual signing policy present, return true/false based on
individual policy. Else, return true/false based on global policy.
Individual policy takes precedence over global.
"""
if not self.policy_enforcement_enabled:
return False

global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired")
individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_to_check.name)
if individual_policy is None or len(individual_policy) == 0:
return global_signature_required
else:
return individual_policy.get("signatureRequired")
individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_name)
individual_signature_required = individual_policy.get("signatureRequired") if individual_policy is not None else None

return individual_signature_required if individual_signature_required is not None else global_signature_required
Loading
Loading