From bb0fa3c3c365dca60e8fcb726124b0611c872681 Mon Sep 17 00:00:00 2001 From: "narrieta@microsoft" Date: Mon, 21 Oct 2024 13:04:34 -0700 Subject: [PATCH 1/5] Alternative implementation of the policy document parser --- azurelinuxagent/ga/policy/policy_engine.py | 290 +++++++++++---------- tests/ga/test_policy_engine.py | 22 +- 2 files changed, 175 insertions(+), 137 deletions(-) diff --git a/azurelinuxagent/ga/policy/policy_engine.py b/azurelinuxagent/ga/policy/policy_engine.py index 8d93e132b2..a21ac705fc 100644 --- a/azurelinuxagent/ga/policy/policy_engine.py +++ b/azurelinuxagent/ga/policy/policy_engine.py @@ -15,8 +15,8 @@ # Requires Python 2.4+ and Openssl 1.0+ # -import copy import json +import re import os from azurelinuxagent.common.future import ustr from azurelinuxagent.common import logger @@ -26,25 +26,10 @@ from azurelinuxagent.common.protocol.extensions_goal_state_from_vm_settings import _CaseFoldedDict from azurelinuxagent.common.utils.flexible_version import FlexibleVersion - -# Schema for policy file. -_POLICY_SCHEMA = \ - { - "policyVersion": ustr, - "extensionPolicies": { - "allowListedExtensionsOnly": bool, - "signatureRequired": bool, - "extensions": { - "": { - "signatureRequired": bool - } - } - } - } - # Default policy values to be used when customer does not specify these attributes in the policy file. _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY = False _DEFAULT_SIGNATURE_REQUIRED = False +_DEFAULT_EXTENSIONS = {} # Agent supports up to this version of the policy file ("policyVersion" in schema). # Increment this number when any new attributes are added to the policy schema. @@ -72,25 +57,11 @@ class _PolicyEngine(object): """ def __init__(self): # Set defaults for policy - self._policy = \ - { - "policyVersion": _MAX_SUPPORTED_POLICY_VERSION, - "extensionPolicies": { - "allowListedExtensionsOnly": _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY, - "signatureRequired": _DEFAULT_SIGNATURE_REQUIRED, - "extensions": {} - } - } - self._policy_enforcement_enabled = self.__get_policy_enforcement_enabled() if not self.policy_enforcement_enabled: return - # Use a copy of the policy as a template. Update the template as we parse the custom policy. - template = copy.deepcopy(self._policy) - custom_policy = self.__read_policy() - self.__parse_policy(template, custom_policy) - self._policy = template + self._policy = self._parse_policy(self.__read_policy()) @staticmethod def _log_policy_event(msg, is_success=True, op=WALAEventOperation.Policy, send_event=True): @@ -142,139 +113,194 @@ def __read_policy(): return custom_policy @staticmethod - def __parse_policy(template, custom_policy): + def _parse_policy(policy): """ - Update template with attributes specified in custom_policy: - - attributes provided in custom_policy override the default values in template - - if an attribute is not provided, use the default value - - if an unrecognized attribute is present in custom_policy (not defined in _POLICY_SCHEMA), raise an error - - if an attribute does not match the type specified in the schema, raise an error + Parses the given policy document and an equivalent document that has been populated with default values and verified for correctness, i.e. + that conforms the following schema: + + { + "policyVersion": "0.1.0", + "extensionPolicies": { + "allowListedExtensionsOnly": , + "signatureRequired": , + "extensions": { + "": { + "signatureRequired": + "runtimePolicy": { + + } + } + }, + } + } + + Raises InvalidPolicyError if the policy document is invalid. """ - # Validate top level attributes and then parse each section of the custom policy. - # Individual parsing functions are responsible for validating schema of that section (any nested dicts). - # Note that validation must happen before parsing. - _PolicyEngine.__validate_schema(custom_policy, _POLICY_SCHEMA) - _PolicyEngine.__parse_version(template, custom_policy) - _PolicyEngine.__parse_extension_policies(template, custom_policy) + if not isinstance(policy, dict): + raise InvalidPolicyError("expected an object describing a Policy; got {0}.".format(type(policy).__name__)) + + _PolicyEngine._check_attributes(policy, object_name="policy", valid_attributes=["policyVersion", "extensionPolicies"]) + + return { + "policyVersion": _PolicyEngine._parse_policy_version(policy), + "extensionPolicies": _PolicyEngine._parse_extension_policies(policy) + } @staticmethod - def __parse_version(template, policy): + def _parse_policy_version(policy): """ Validate and return "policyVersion" attribute. If not a string in the format "x.y.z", raise InvalidPolicyError. If policy_version is greater than maximum supported version, raise InvalidPolicyError. """ - version = policy.get("policyVersion") - if version is None: - return + version = _PolicyEngine._get_string(policy, attribute="policyVersion") - try: - flexible_version = FlexibleVersion(version) - except ValueError: - raise InvalidPolicyError( - "invalid value for attribute 'policyVersion' attribute 'policyVersion' is expected to be in format 'major.minor.patch' " - "(e.g., '1.0.0'). Please change to a valid value.") + if not re.match(r"^\d+\.\d+\.\d+$", version): + raise InvalidPolicyError("invalid value for attribute 'policyVersion'; it should be in format 'major.minor.patch' (e.g., '1.0.0')") - if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < flexible_version: - raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'. Please provide a compatible policy version." - .format(version, _MAX_SUPPORTED_POLICY_VERSION)) + if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version): + raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION)) - template["policyVersion"] = version + return version @staticmethod - def __parse_extension_policies(template, policy): - extension_policies = policy.get("extensionPolicies") - if extension_policies is not None: - _PolicyEngine.__validate_schema(extension_policies, _POLICY_SCHEMA["extensionPolicies"], "extensionPolicies") + def _parse_extension_policies(policy): + """ + Parses the "extensionPolicies" attribute of the policy document. It should conform to the following schema: - # Parse allowlist policy - allowlist_policy = extension_policies.get("allowListedExtensionsOnly") - if allowlist_policy is not None: - template["extensionPolicies"]["allowListedExtensionsOnly"] = allowlist_policy + "extensionPolicies": { + "allowListedExtensionsOnly": , + "signatureRequired": , + "extensions": { + "": { + "signatureRequired": + "runtimePolicy": { + + } + } + }, + } + """ + extension_policies = _PolicyEngine._get_dictionary(policy, attribute="extensionPolicies", optional=True, default={}) - # Parse global signature policy - signature_policy = extension_policies.get("signatureRequired") - if signature_policy is not None: - template["extensionPolicies"]["signatureRequired"] = signature_policy + _PolicyEngine._check_attributes(extension_policies, object_name="extensionPolicies", valid_attributes=["allowListedExtensionsOnly", "signatureRequired", "extensions"]) - # Parse individual extension policies - _PolicyEngine.__parse_extensions(template, extension_policies) + return { + "allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY), + "signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED), + "extensions": _PolicyEngine._parse_extensions( + _PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS)) + ) + } @staticmethod - def __parse_extensions(template, extensions_policy): + def _parse_extensions(extensions): """ - Parse "extensions" dict and update in template. - "extensions" is expected to be in the format: - { - "extensions": { - "": { - "signatureRequired": bool - } + Parses the "extensions" attribute. It should conform to the following schema: + + "extensions": { + "": { + "signatureRequired": bool + "runtimePolicy": { + + } } } - If "signatureRequired" isn't provided, the global "signatureRequired" value will be used instead. - The "extensions" attribute will be converted to a case-folded dict. CRP allows extensions to be any - case, so we use case-folded dict to allow for case-insensitive lookup of individual extension policies. + The return value is a case-folded dict. CRP allows extensions to be any case, so we allow for case-insensitive lookup of individual extension policies. """ - extensions = extensions_policy.get("extensions") - if extensions is None: - return + parsed = _CaseFoldedDict.from_dict({}) - parsed_extensions_dict = {} - for extension_name, individual_policy in extensions.items(): + for extension, extension_policy in extensions.items(): + if not isinstance(extension_policy, dict): + raise InvalidPolicyError("invalid type {0} for attribute '{1}', must be 'object'".format(type(extension_policy).__name__, extension)) + parsed[extension] = _PolicyEngine._parse_extension(extension_policy) - # We don't validate "extensions" against the schema, because the attributes (individual extension names) - # are dynamic and not defined in the schema. We do validate that individual_policy is a dict, and validate - # the schema of individual_policy. - individual_policy_schema = _POLICY_SCHEMA["extensionPolicies"]["extensions"][""] - if not isinstance(individual_policy, dict): - raise InvalidPolicyError("invalid type {0} for attribute '{1}', please change to object." - .format(type(individual_policy).__name__, extension_name)) - _PolicyEngine.__validate_schema(individual_policy, individual_policy_schema, extension_name) + return parsed - extension_signature_policy = individual_policy.get("signatureRequired") - if extension_signature_policy is None: - extension_signature_policy = template["extensionPolicies"]["signatureRequired"] + @staticmethod + def _parse_extension(extension): + """ + Parses an individual extension. It should conform to the following schema: - parsed_extensions_dict[extension_name] = \ - { - "signatureRequired": extension_signature_policy - } + "": { + "signatureRequired": bool + "runtimePolicy": { + + } + } + """ + extension_attribute_name = "extensionPolicies.extensions.{0}".format(extension) + + _PolicyEngine._check_attributes(extension, object_name=extension_attribute_name, valid_attributes=["signatureRequired", "runtimePolicy"]) + + return_value = {} + + signature_required = _PolicyEngine._get_boolean(extension, attribute="signatureRequired", name_prefix=extension_attribute_name, optional=True, default=None) + if signature_required is not None: + return_value["signatureRequired"] = signature_required + + # The runtimePolicy is an arbitrary object. + runtime_policy = extension.get("runtimePolicy") + if runtime_policy is not None: + return_value["runtimePolicy"] = runtime_policy - # Convert "extensions" to a case-folded dict for case-insensitive lookup - case_folded_extensions_dict = _CaseFoldedDict.from_dict(parsed_extensions_dict) - template["extensionPolicies"]["extensions"] = case_folded_extensions_dict + return return_value @staticmethod - def __validate_schema(policy, schema, section_name=None): + def _check_attributes(object_, object_name, valid_attributes): """ - Validate the provided policy against the schema - we only do a shallow check (no recursion into nested dicts). - If there is an unrecognized attribute, raise an error. + Check that the given object, which should be a dictionary, has only the specified attributes. + If any other attributes are present, raise InvalidPolicyError. + The object_name is used in the error message. """ - for key, value in policy.items(): - if key not in schema: - raise InvalidPolicyError("attribute '{0}' is not defined in the policy schema. Please refer to the policy documentation " - "and change or remove this attribute accordingly.".format(key)) - - expected_type = schema.get(key) - if isinstance(expected_type, dict): - expected_type = dict - type_in_err_msg = { - dict: "object", - ustr: "string", - bool: "boolean" - } + for k in object_.keys(): + if k not in valid_attributes: + raise InvalidPolicyError("invalid attribute '{0}' in {1}".format(k, object_name)) + + @staticmethod + def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None): + """ + Returns object[attribute] if it exists, verifying that it is a dictionary, else returns default. + If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + The name_prefix indicates the path of the attribute within the policy document and is used in the error message. + """ + return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default) - if not isinstance(value, expected_type): - if section_name is None: - msg = ("invalid type {0} for attribute '{1}', please change to {2}." - .format(type(value).__name__, key, type_in_err_msg.get(expected_type))) - else: - msg = ("invalid type {0} for attribute '{1}' in section '{2}', please change to {3}." - .format(type(value).__name__, key, section_name, - type_in_err_msg.get(expected_type))) + @staticmethod + def _get_string(object_, attribute, name_prefix="", optional=False, default=None): + """ + Returns object[attribute] if it exists, verifying that it is a string, else returns default. + If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + The name_prefix indicates the path of the attribute within the policy document and is used in the error message. + """ + return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default) + + @staticmethod + def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None): + """ + Returns object[attribute] if it exists, verifying that it is a boolean, else returns default. + If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + The name_prefix indicates the path of the attribute within the policy document and is used in the error message. + """ + return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default) - raise InvalidPolicyError(msg) + @staticmethod + def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default): + """ + Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default. + If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message. + """ + if default is not None and not optional: + raise ValueError("default value should only be provided for optional attributes") + value = object_.get(attribute) + if value is None: + if not optional: + raise InvalidPolicyError("missing required attribute '{0}{1}'".format(name_prefix, attribute)) + return default + if not isinstance(value, type_): + raise InvalidPolicyError("invalid type {0} for attribute '{1}{2}'; must be '{3}'".format(type(value).__name__, name_prefix, attribute, type_name)) + return value class ExtensionPolicyEngine(_PolicyEngine): @@ -311,7 +337,7 @@ def should_enforce_signature_validation(self, extension_to_check): global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired") individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_to_check.name) - if individual_policy is None: + if individual_policy is None or len(individual_policy) == 0: return global_signature_required else: return individual_policy.get("signatureRequired") diff --git a/tests/ga/test_policy_engine.py b/tests/ga/test_policy_engine.py index 3a7e9329f3..9ce1c3b7bd 100644 --- a/tests/ga/test_policy_engine.py +++ b/tests/ga/test_policy_engine.py @@ -77,7 +77,10 @@ def test_policy_enforcement_should_be_enabled_when_policy_file_exists_and_conf_f When conf flag is set to true and policy file is present at expected location, feature should be enabled. """ # Create policy file with empty policy object at the expected path to enable feature. - self._create_policy_file({}) + self._create_policy_file( + { + "policyVersion": "0.0.1" + }) engine = _PolicyEngine() self.assertTrue(engine.policy_enforcement_enabled, msg="Conf flag is set to true so policy enforcement should be enabled.") @@ -115,7 +118,8 @@ def test_should_parse_policy_successfully(self): "signatureRequired": True, "extensions": { TEST_EXTENSION_NAME: { - "signatureRequired": False + "signatureRequired": False, + "runtimePolicy": True } } } @@ -128,7 +132,8 @@ def test_should_parse_policy_successfully(self): "signatureRequired": False, "extensions": { TEST_EXTENSION_NAME: { - "signatureRequired": True + "signatureRequired": True, + "runtimePolicy": [ True, None, { "bar": "baz" } ] } } } @@ -147,6 +152,14 @@ def test_should_parse_policy_successfully(self): actual_individual_policy = actual_extension_policy.get("extensions").get(TEST_EXTENSION_NAME) expected_individual_policy = expected_extension_policy.get("extensions").get(TEST_EXTENSION_NAME) self.assertEqual(actual_individual_policy.get("signatureRequired"), expected_individual_policy.get("signatureRequired")) + self.assertEqual(actual_individual_policy.get("runtimePolicy"), expected_individual_policy.get("runtimePolicy")) + + def test_it_should_verify_policy_version_is_required(self): + self._create_policy_file({ + "extensionPolicies": {} + }) + with self.assertRaises(InvalidPolicyError): + _PolicyEngine() def test_should_raise_error_if_policy_file_is_invalid_json(self): cases = [ @@ -321,7 +334,6 @@ def test_should_use_default_policy_if_no_extension_policy_specified(self): """ test_extension = Extension(name=TEST_EXTENSION_NAME) policy_cases = [ - {}, { "policyVersion": "0.1.0" }, @@ -581,4 +593,4 @@ def test_extension_name_in_policy_should_be_case_insensitive(self): self.assertTrue(should_allow, msg="Extension should have been found in allowlist regardless of extension name case.") self.assertTrue(should_enforce_signature, - msg="Individual signatureRequired policy should have been found and used, regardless of extension name case.") \ No newline at end of file + msg="Individual signatureRequired policy should have been found and used, regardless of extension name case.") From 12561d81738398c82aeaf2172e91126836afca63 Mon Sep 17 00:00:00 2001 From: "narrieta@microsoft" Date: Wed, 23 Oct 2024 16:00:54 -0700 Subject: [PATCH 2/5] code review feedback --- azurelinuxagent/ga/policy/policy_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/azurelinuxagent/ga/policy/policy_engine.py b/azurelinuxagent/ga/policy/policy_engine.py index a21ac705fc..778a47ae09 100644 --- a/azurelinuxagent/ga/policy/policy_engine.py +++ b/azurelinuxagent/ga/policy/policy_engine.py @@ -155,7 +155,7 @@ def _parse_policy_version(policy): version = _PolicyEngine._get_string(policy, attribute="policyVersion") if not re.match(r"^\d+\.\d+\.\d+$", version): - raise InvalidPolicyError("invalid value for attribute 'policyVersion'; it should be in format 'major.minor.patch' (e.g., '1.0.0')") + raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major.minor.patch' (e.g., '1.0.0')".format(version)) if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version): raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION)) @@ -212,7 +212,7 @@ def _parse_extensions(extensions): for extension, extension_policy in extensions.items(): if not isinstance(extension_policy, dict): - raise InvalidPolicyError("invalid type {0} for attribute '{1}', must be 'object'".format(type(extension_policy).__name__, extension)) + raise InvalidPolicyError("invalid type {0} for attribute '{1}'; must be 'object'".format(type(extension_policy).__name__, extension)) parsed[extension] = _PolicyEngine._parse_extension(extension_policy) return parsed From 59c73058416130a2c98aa40be4bfabe39ce8b9d9 Mon Sep 17 00:00:00 2001 From: "narrieta@microsoft" Date: Wed, 23 Oct 2024 19:23:10 -0700 Subject: [PATCH 3/5] Comments --- azurelinuxagent/ga/policy/policy_engine.py | 50 ++++------------------ 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/azurelinuxagent/ga/policy/policy_engine.py b/azurelinuxagent/ga/policy/policy_engine.py index 778a47ae09..b0c7fb15e2 100644 --- a/azurelinuxagent/ga/policy/policy_engine.py +++ b/azurelinuxagent/ga/policy/policy_engine.py @@ -115,20 +115,18 @@ def __read_policy(): @staticmethod def _parse_policy(policy): """ - Parses the given policy document and an equivalent document that has been populated with default values and verified for correctness, i.e. + Parses the given policy document and returns an equivalent document that has been populated with default values and verified for correctness, i.e. that conforms the following schema: { "policyVersion": "0.1.0", "extensionPolicies": { - "allowListedExtensionsOnly": , - "signatureRequired": , - "extensions": { + "allowListedExtensionsOnly": , [Optional; default: false] + "signatureRequired": , [Optional; default: false] + "extensions": { [Optional; default: {} (empty)] "": { - "signatureRequired": - "runtimePolicy": { - - } + "signatureRequired": [Optional; no default] + "runtimePolicy": [Optional; no default] } }, } @@ -165,20 +163,7 @@ def _parse_policy_version(policy): @staticmethod def _parse_extension_policies(policy): """ - Parses the "extensionPolicies" attribute of the policy document. It should conform to the following schema: - - "extensionPolicies": { - "allowListedExtensionsOnly": , - "signatureRequired": , - "extensions": { - "": { - "signatureRequired": - "runtimePolicy": { - - } - } - }, - } + Parses the "extensionPolicies" attribute of the policy document. See _parse_policy() for schema. """ extension_policies = _PolicyEngine._get_dictionary(policy, attribute="extensionPolicies", optional=True, default={}) @@ -195,17 +180,7 @@ def _parse_extension_policies(policy): @staticmethod def _parse_extensions(extensions): """ - Parses the "extensions" attribute. It should conform to the following schema: - - "extensions": { - "": { - "signatureRequired": bool - "runtimePolicy": { - - } - } - } - + Parses the "extensions" attribute. See _parse_policy() for schema. The return value is a case-folded dict. CRP allows extensions to be any case, so we allow for case-insensitive lookup of individual extension policies. """ parsed = _CaseFoldedDict.from_dict({}) @@ -220,14 +195,7 @@ def _parse_extensions(extensions): @staticmethod def _parse_extension(extension): """ - Parses an individual extension. It should conform to the following schema: - - "": { - "signatureRequired": bool - "runtimePolicy": { - - } - } + Parses an individual extension. See _parse_policy() for schema. """ extension_attribute_name = "extensionPolicies.extensions.{0}".format(extension) From 5e7f479ff992b7df8e286a776b7d1902ccc4f55f Mon Sep 17 00:00:00 2001 From: "narrieta@microsoft" Date: Thu, 24 Oct 2024 21:44:16 -0700 Subject: [PATCH 4/5] review feedback --- azurelinuxagent/ga/policy/policy_engine.py | 43 +++--- tests/ga/test_policy_engine.py | 155 +++++++-------------- 2 files changed, 71 insertions(+), 127 deletions(-) diff --git a/azurelinuxagent/ga/policy/policy_engine.py b/azurelinuxagent/ga/policy/policy_engine.py index b0c7fb15e2..5bb8a3d3aa 100644 --- a/azurelinuxagent/ga/policy/policy_engine.py +++ b/azurelinuxagent/ga/policy/policy_engine.py @@ -152,8 +152,8 @@ def _parse_policy_version(policy): """ version = _PolicyEngine._get_string(policy, attribute="policyVersion") - if not re.match(r"^\d+\.\d+\.\d+$", version): - raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major.minor.patch' (e.g., '1.0.0')".format(version)) + if not re.match(r"^\d+(\.\d+(\.\d+)?)?$", version): + raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major[.minor[.patch]]' (e.g., '1', '1.0', '1.0.0')".format(version)) if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version): raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION)) @@ -173,7 +173,7 @@ def _parse_extension_policies(policy): "allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY), "signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED), "extensions": _PolicyEngine._parse_extensions( - _PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS)) + _PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_EXTENSIONS) ) } @@ -187,7 +187,7 @@ def _parse_extensions(extensions): for extension, extension_policy in extensions.items(): if not isinstance(extension_policy, dict): - raise InvalidPolicyError("invalid type {0} for attribute '{1}'; must be 'object'".format(type(extension_policy).__name__, extension)) + raise InvalidPolicyError("invalid type {0} for attribute 'extensionPolicies.extensions.{1}'; must be 'object'".format(type(extension_policy).__name__, extension)) parsed[extension] = _PolicyEngine._parse_extension(extension_policy) return parsed @@ -223,13 +223,14 @@ def _check_attributes(object_, object_name, valid_attributes): """ for k in object_.keys(): if k not in valid_attributes: - raise InvalidPolicyError("invalid attribute '{0}' in {1}".format(k, object_name)) + raise InvalidPolicyError("unrecognized attribute '{0}' in {1}".format(k, object_name)) @staticmethod def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None): """ - Returns object[attribute] if it exists, verifying that it is a dictionary, else returns default. - If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + Returns object[attribute] if it exists, verifying that it is a dictionary. + If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. + If object_[attribute] is not a dictionary, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default) @@ -238,7 +239,8 @@ def _get_dictionary(object_, attribute, name_prefix="", optional=False, default= def _get_string(object_, attribute, name_prefix="", optional=False, default=None): """ Returns object[attribute] if it exists, verifying that it is a string, else returns default. - If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. + If object_[attribute] is not a string, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default) @@ -247,7 +249,8 @@ def _get_string(object_, attribute, name_prefix="", optional=False, default=None def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None): """ Returns object[attribute] if it exists, verifying that it is a boolean, else returns default. - If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. + If object_[attribute] is not a boolean, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default) @@ -256,7 +259,8 @@ def _get_boolean(object_, attribute, name_prefix="", optional=False, default=Non def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default): """ Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default. - If optional is False and object[attribute] does not exist, raise InvalidPolicyError. + If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. + If the type of object_[attribute] is not 'type_', raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message. """ if default is not None and not optional: @@ -273,7 +277,7 @@ def _get_value(object_, attribute, name_prefix, type_, type_name, optional, defa class ExtensionPolicyEngine(_PolicyEngine): - def should_allow_extension(self, extension_to_check): + def should_allow_extension(self, extension_name): """ Return whether we should allow extension download based on policy. extension_to_check is expected to be an Extension object. @@ -288,24 +292,21 @@ def should_allow_extension(self, extension_to_check): allow_listed_extension_only = self._policy.get("extensionPolicies").get("allowListedExtensionsOnly") extension_allowlist = self._policy.get("extensionPolicies").get("extensions") - should_allow = not allow_listed_extension_only or extension_allowlist.get(extension_to_check.name) is not None + should_allow = not allow_listed_extension_only or extension_allowlist.get(extension_name) is not None return should_allow - def should_enforce_signature_validation(self, extension_to_check): + def should_enforce_signature_validation(self, extension_name): """ Return whether we should enforce signature based on policy. - extension_to_check is expected to be an Extension object. If policy feature not enabled, return False. - Individual policy takes precedence over global - if individual signing policy present, return true/false based on - individual policy. Else, return true/false based on global policy. + Individual policy takes precedence over global. """ if not self.policy_enforcement_enabled: return False global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired") - individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_to_check.name) - if individual_policy is None or len(individual_policy) == 0: - return global_signature_required - else: - return individual_policy.get("signatureRequired") + individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_name) + individual_signature_required = individual_policy.get("signatureRequired") if individual_policy is not None else None + + return individual_signature_required if individual_signature_required is not None else global_signature_required diff --git a/tests/ga/test_policy_engine.py b/tests/ga/test_policy_engine.py index 9ce1c3b7bd..c55a5f5cf6 100644 --- a/tests/ga/test_policy_engine.py +++ b/tests/ga/test_policy_engine.py @@ -22,7 +22,6 @@ from azurelinuxagent.ga.policy.policy_engine import ExtensionPolicyEngine, InvalidPolicyError, \ _PolicyEngine, _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY, _DEFAULT_SIGNATURE_REQUIRED from tests.lib.tools import patch -from azurelinuxagent.common.protocol.restapi import Extension TEST_EXTENSION_NAME = "Microsoft.Azure.ActiveDirectory.AADSSHLoginForLinux" @@ -161,6 +160,13 @@ def test_it_should_verify_policy_version_is_required(self): with self.assertRaises(InvalidPolicyError): _PolicyEngine() + def test_it_should_accept_partially_specified_policy_versions(self): + for policy_version in ['0', '0.1', '0.1.0']: + self._create_policy_file({ + "policyVersion": policy_version, + }) + self.assertEqual(policy_version, _PolicyEngine()._policy["policyVersion"]) + def test_should_raise_error_if_policy_file_is_invalid_json(self): cases = [ ''' @@ -308,11 +314,10 @@ def test_should_allow_and_should_not_enforce_signature_if_no_custom_policy_file( When custom policy file not present, should allow all extensions and not enforce signature. """ # No policy file is present - feature is disabled. - test_extension = Extension(name=TEST_EXTENSION_NAME) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME) self.assertTrue(should_allow, msg="Policy feature is disabled because no policy file present, so all extensions should be allowed.") - should_enforce = engine.should_enforce_signature_validation(test_extension) + should_enforce = engine.should_enforce_signature_validation(TEST_EXTENSION_NAME) self.assertFalse(should_enforce, msg="Policy feature is disabled because no policy file present, so signature should not be enforced.") def test_should_allow_and_should_not_enforce_signature_if_conf_flag_false(self): @@ -321,18 +326,16 @@ def test_should_allow_and_should_not_enforce_signature_if_conf_flag_false(self): """ self.patch_conf_flag.stop() self._create_policy_file({}) - test_extension = Extension(name=TEST_EXTENSION_NAME) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME) self.assertTrue(should_allow, msg="Policy feature is disabled because conf flag false, so all extensions should be allowed.") - should_enforce = engine.should_enforce_signature_validation(test_extension) + should_enforce = engine.should_enforce_signature_validation(TEST_EXTENSION_NAME) self.assertFalse(should_enforce, msg="Policy feature is disabled because conf flag false, so signature should not be enforced.") def test_should_use_default_policy_if_no_extension_policy_specified(self): """ Test that default policy is used when policy file does not specify the extension policy. """ - test_extension = Extension(name=TEST_EXTENSION_NAME) policy_cases = [ { "policyVersion": "0.1.0" @@ -345,10 +348,10 @@ def test_should_use_default_policy_if_no_extension_policy_specified(self): for policy in policy_cases: self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME) self.assertEqual(should_allow, not _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY, msg="Extension policy is not specified, so should use default policy.") - should_enforce = engine.should_enforce_signature_validation(test_extension) + should_enforce = engine.should_enforce_signature_validation(TEST_EXTENSION_NAME) self.assertEqual(should_enforce, _DEFAULT_SIGNATURE_REQUIRED, msg="Extension policy is not specified, so should use default policy.") @@ -356,9 +359,7 @@ def test_should_allow_if_allowListedExtensionsOnly_true_and_extension_in_list(se """ If allowListedExtensionsOnly is true and extension in list, should_allow = True. """ - test_extension = Extension(name=TEST_EXTENSION_NAME) TEST_EXTENSION_NAME_2 = "Test.Extension.Name" - test_extension_2 = Extension(name=TEST_EXTENSION_NAME_2) policy = \ { "policyVersion": "0.1.0", @@ -375,16 +376,15 @@ def test_should_allow_if_allowListedExtensionsOnly_true_and_extension_in_list(se } self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME) self.assertTrue(should_allow, msg="Extension is in allowlist, so should be allowed.") - should_allow = engine.should_allow_extension(test_extension_2) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME_2) self.assertTrue(should_allow, msg="Extension is in allowlist, so should be allowed.") def test_should_not_allow_if_allowListedExtensionsOnly_true_and_extension_not_in_list(self): """ If allowListedExtensionsOnly is true and extension not in list, should_allow = False. """ - test_extension = Extension(name=TEST_EXTENSION_NAME) policy = \ { "policyVersion": "0.1.0", @@ -396,7 +396,7 @@ def test_should_not_allow_if_allowListedExtensionsOnly_true_and_extension_not_in } self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) + should_allow = engine.should_allow_extension(TEST_EXTENSION_NAME) self.assertFalse(should_allow, msg="allowListedExtensionsOnly is true and extension is not in allowlist, so should not be allowed.") @@ -405,8 +405,6 @@ def test_should_allow_if_allowListedExtensionsOnly_false(self): If allowListedExtensionsOnly is false, should_allow = True (whether extension in list or not). """ # Test an extension in the allowlist, and an extension not in the allowlist. Both should be allowed. - test_ext_in_list = Extension(name=TEST_EXTENSION_NAME) - test_ext_not_in_list = Extension(name="Random.Ext") policy = \ { "policyVersion": "0.1.0", @@ -422,16 +420,15 @@ def test_should_allow_if_allowListedExtensionsOnly_false(self): } self._create_policy_file(policy) engine = ExtensionPolicyEngine() - self.assertTrue(engine.should_allow_extension(test_ext_in_list), + self.assertTrue(engine.should_allow_extension(TEST_EXTENSION_NAME), msg="allowListedExtensionsOnly is false, so extension should be allowed.") - self.assertTrue(engine.should_allow_extension(test_ext_not_in_list), + self.assertTrue(engine.should_allow_extension("Random.Ext"), msg="allowListedExtensionsOnly is false, so extension should be allowed.") def test_should_enforce_signature_if_individual_signatureRequired_true(self): """ If signatureRequired is true for individual extension, should_enforce_signature_validation = True (whether global signatureRequired is true or false). """ - test_extension = Extension(name=TEST_EXTENSION_NAME) for global_rule in [True, False]: policy = \ { @@ -448,7 +445,7 @@ def test_should_enforce_signature_if_individual_signatureRequired_true(self): } self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) + should_enforce_signature = engine.should_enforce_signature_validation(TEST_EXTENSION_NAME) self.assertTrue(should_enforce_signature, msg="Individual signatureRequired policy is true, so signature should be enforced.") @@ -456,7 +453,6 @@ def test_should_not_enforce_signature_if_individual_signatureRequired_false(self """ If signatureRequired is false for individual extension policy, should_enforce_signature_validation = False (whether global signatureRequired is true or false). """ - test_extension = Extension(name=TEST_EXTENSION_NAME) for global_rule in [True, False]: policy = \ { @@ -473,93 +469,41 @@ def test_should_not_enforce_signature_if_individual_signatureRequired_false(self } self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) + should_enforce_signature = engine.should_enforce_signature_validation(TEST_EXTENSION_NAME) self.assertFalse(should_enforce_signature, msg="Individual signatureRequired policy is false, so signature should be not enforced.") - def test_should_enforce_signature_if_global_signatureRequired_true_and_no_individual_policy(self): - """ - If signatureRequired is true globally and no individual extension signature policy, should_enforce_signature_validation = True. - """ - test_extension = Extension(name=TEST_EXTENSION_NAME) - policy = \ - { - "policyVersion": "0.1.0", - "extensionPolicies": { - "allowListedExtensionsOnly": True, - "signatureRequired": True, - "extensions": {} - } - } - self._create_policy_file(policy) - engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) - self.assertTrue(should_enforce_signature, - msg="Global signatureRequired policy is true, so signature should be enforced.") - - def test_should_not_enforce_signature_if_global_signatureRequired_false_and_no_individual_policy(self): - """ - If signatureRequired is false globally and no individual extension signature policy, should_enforce_signature_validation = False. - """ - test_extension = Extension(name=TEST_EXTENSION_NAME) - policy = \ - { - "policyVersion": "0.1.0", - "extensionPolicies": { - "allowListedExtensionsOnly": True, - "signatureRequired": False, - "extensions": {} - } - } - self._create_policy_file(policy) - engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) - self.assertFalse(should_enforce_signature, - msg="Global signatureRequired policy is false, so signature should not be enforced.") - - def test_should_enforce_signature_if_global_signatureRequired_true_and_individual_signatureRequired_not_specified(self): - """ - If individual policy is present, but signatureRequired is not specified for that policy, use global signatureRequired. - """ - test_extension = Extension(name=TEST_EXTENSION_NAME) - policy = \ - { - "policyVersion": "0.1.0", - "extensionPolicies": { - "allowListedExtensionsOnly": True, - "signatureRequired": True, - "extensions": { - TEST_EXTENSION_NAME: {} + def test_should_use_global_signatureRequired_when_an_individual_policy_is_not_specified(self): + for global_policy in [True, False]: + extensions_test_cases = [ + None, + {}, + { + TEST_EXTENSION_NAME: {} + }, + { + TEST_EXTENSION_NAME: { + "runtimePolicy": "an arbitrary object" } } - } - self._create_policy_file(policy) - engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) - self.assertTrue(should_enforce_signature, - msg="Individual signatureRequired policy is not set, so should use global policy and enforce signature.") - - def test_should_not_enforce_signature_if_global_signatureRequired_false_and_individual_signatureRequired_not_specified(self): - """ - If individual policy is present, but signatureRequired is not specified for that policy, use global signatureRequired. - """ - test_extension = Extension(name=TEST_EXTENSION_NAME) - policy = \ - { - "policyVersion": "0.1.0", - "extensionPolicies": { - "allowListedExtensionsOnly": True, - "signatureRequired": False, - "extensions": { - TEST_EXTENSION_NAME: {} + ] + for extensions in extensions_test_cases: + policy = { + "policyVersion": "0.1.0", + "extensionPolicies": { + "allowListedExtensionsOnly": True, + "signatureRequired": global_policy, } } - } - self._create_policy_file(policy) - engine = ExtensionPolicyEngine() - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) - self.assertFalse(should_enforce_signature, - msg="Individual signatureRequired policy is not set, so should use global policy and enforce signature.") + if extensions is not None: + policy["extensionPolicies"]["extensions"] = extensions + + self._create_policy_file(policy) + + self.assertEqual( + global_policy, + ExtensionPolicyEngine().should_enforce_signature_validation(TEST_EXTENSION_NAME), + "The global signatureRequired ({0}) should have been used. Policy:\n{1}".format(global_policy, policy)) def test_extension_name_in_policy_should_be_case_insensitive(self): """ @@ -571,7 +515,6 @@ def test_extension_name_in_policy_should_be_case_insensitive(self): "MicrOsoft.aZure.activedirectory.aaDsShloginFORlinux", "microsoft.azure.activedirectory.aadsshloginforlinux" ]: - test_extension = Extension(name=ext_name_to_test) policy = \ { "policyVersion": "0.1.0", @@ -588,8 +531,8 @@ def test_extension_name_in_policy_should_be_case_insensitive(self): self._create_policy_file(policy) engine = ExtensionPolicyEngine() - should_allow = engine.should_allow_extension(test_extension) - should_enforce_signature = engine.should_enforce_signature_validation(test_extension) + should_allow = engine.should_allow_extension(ext_name_to_test) + should_enforce_signature = engine.should_enforce_signature_validation(ext_name_to_test) self.assertTrue(should_allow, msg="Extension should have been found in allowlist regardless of extension name case.") self.assertTrue(should_enforce_signature, From f6c53ea900e677577ae5261fbc208beefb614e75 Mon Sep 17 00:00:00 2001 From: "narrieta@microsoft" Date: Fri, 25 Oct 2024 11:22:00 -0700 Subject: [PATCH 5/5] review --- azurelinuxagent/ga/policy/policy_engine.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/azurelinuxagent/ga/policy/policy_engine.py b/azurelinuxagent/ga/policy/policy_engine.py index 5bb8a3d3aa..065589f675 100644 --- a/azurelinuxagent/ga/policy/policy_engine.py +++ b/azurelinuxagent/ga/policy/policy_engine.py @@ -147,7 +147,7 @@ def _parse_policy(policy): @staticmethod def _parse_policy_version(policy): """ - Validate and return "policyVersion" attribute. If not a string in the format "x.y.z", raise InvalidPolicyError. + Validate and return "policyVersion" attribute. If not a string in the format "major[.minor[.patch]]", raise InvalidPolicyError. If policy_version is greater than maximum supported version, raise InvalidPolicyError. """ version = _PolicyEngine._get_string(policy, attribute="policyVersion") @@ -280,8 +280,6 @@ class ExtensionPolicyEngine(_PolicyEngine): def should_allow_extension(self, extension_name): """ Return whether we should allow extension download based on policy. - extension_to_check is expected to be an Extension object. - If policy feature not enabled, return True. If allowListedExtensionsOnly=true, return true only if extension present in "extensions" allowlist. If allowListedExtensions=false, return true always. @@ -298,7 +296,6 @@ def should_allow_extension(self, extension_name): def should_enforce_signature_validation(self, extension_name): """ Return whether we should enforce signature based on policy. - If policy feature not enabled, return False. Individual policy takes precedence over global. """