diff --git a/src/cfnlint/api.py b/src/cfnlint/api.py index 78c67f805c..303b60ee11 100644 --- a/src/cfnlint/api.py +++ b/src/cfnlint/api.py @@ -11,7 +11,8 @@ from cfnlint.decode.decode import decode_str from cfnlint.helpers import REGION_PRIMARY, REGIONS from cfnlint.rules import Match, RulesCollection -from cfnlint.runner import Runner, TemplateRunner +from cfnlint.runner import Runner +from cfnlint.runner.template import run_template_by_data Matches = List[Match] @@ -56,11 +57,16 @@ def lint( config_mixin = ConfigMixIn(**config) if isinstance(rules, RulesCollection): - template_runner = TemplateRunner(None, template, config_mixin, rules) # type: ignore # noqa: E501 - return list(template_runner.run()) + return list( + run_template_by_data( + template, + config_mixin, + rules, # type: ignore + ) + ) runner = Runner(config_mixin) - return list(runner.validate_template(None, template)) + return list(runner.validate_template(template)) def lint_all(s: str) -> list[Match]: diff --git a/src/cfnlint/config.py b/src/cfnlint/config.py index aeaebd41b9..33ad3ff411 100644 --- a/src/cfnlint/config.py +++ b/src/cfnlint/config.py @@ -250,6 +250,17 @@ def comma_separated_arg(string): return string.split(",") +class key_value(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, dict()) + + for value in values: + # split it into key and value + key, value = value.split("=", 1) + # assign into dictionary + getattr(namespace, self.dest)[key.strip()] = value.strip() + + def _ensure_value(namespace, name, value): if getattr(namespace, name, None) is None: setattr(namespace, name, value) @@ -400,6 +411,16 @@ def __call__(self, parser, namespace, values, option_string=None): default=[], action="extend", ) + standard.add_argument( + "-tp", + "--template-parameters", + dest="template_parameters", + metavar="KEY=VALUE", + nargs="+", + default={}, + action=key_value, + help="only check rules whose id do not match these values", + ) advanced.add_argument( "-D", "--debug", help="Enable debug logging", action="store_true" ) @@ -616,6 +637,7 @@ class ManualArgs(TypedDict, total=False): configure_rules: dict[str, dict[str, Any]] include_checks: list[str] ignore_checks: list[str] + template_parameters: dict[str, Any] mandatory_checks: list[str] include_experimental: bool ignore_bad_template: bool @@ -646,6 +668,7 @@ def __repr__(self): "ignore_checks": self.ignore_checks, "include_checks": self.include_checks, "mandatory_checks": self.mandatory_checks, + "template_parameters": self.template_parameters, "include_experimental": self.include_experimental, "configure_rules": self.configure_rules, "regions": self.regions, @@ -817,6 +840,14 @@ def append_rules(self): "append_rules", False, True ) + @property + def template_parameters(self): + return self._get_argument_value("template_parameters", True, True) + + @template_parameters.setter + def template_parameters(self, template_parameters: dict[str, Any]): + self._manual_args["template_parameters"] = template_parameters + @property def override_spec(self): return self._get_argument_value("override_spec", False, True) diff --git a/src/cfnlint/context/context.py b/src/cfnlint/context/context.py index a8c9f073e8..eb4dafba9a 100644 --- a/src/cfnlint/context/context.py +++ b/src/cfnlint/context/context.py @@ -476,4 +476,5 @@ def create_context_for_template(cfn): regions=cfn.regions, path=Path(), functions=["Fn::Transform"], + ref_values=cfn.parameters, ) diff --git a/src/cfnlint/core.py b/src/cfnlint/core.py index e1e231d766..930fca6715 100644 --- a/src/cfnlint/core.py +++ b/src/cfnlint/core.py @@ -11,7 +11,8 @@ from cfnlint.config import _DEFAULT_RULESDIR, ConfigMixIn, ManualArgs from cfnlint.match import Match from cfnlint.rules import RulesCollection -from cfnlint.runner import TemplateRunner, UnexpectedRuleException +from cfnlint.runner.exceptions import UnexpectedRuleException +from cfnlint.runner.template.runner import _run_template def get_rules( @@ -68,11 +69,11 @@ def run_checks( **config, ) - runner = TemplateRunner( - filename=filename, - template=template, - rules=rules, # type: ignore - config=config_mixin, + return list( + _run_template( + filename=filename, + template=template, + rules=rules, # type: ignore + config=config_mixin, + ) ) - - return list(runner.run()) diff --git a/src/cfnlint/rules/parameters/DeploymentParameters.py b/src/cfnlint/rules/parameters/DeploymentParameters.py new file mode 100644 index 0000000000..e58cd84523 --- /dev/null +++ b/src/cfnlint/rules/parameters/DeploymentParameters.py @@ -0,0 +1,106 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from typing import Any + +from cfnlint.jsonschema import Validator +from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema + + +class DeploymentParameters(CfnLintJsonSchema): + """Check if Parameters are configured correctly""" + + id = "E2900" + shortdesc = "Parameters have appropriate properties" + description = "Making sure the parameters are properly configured" + source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html" + tags = ["parameters"] + + def __init__(self): + """Init""" + super().__init__( + keywords=["Parameters"], + all_matches=True, + ) + + def _is_type_a_list(self, parameter_type: str) -> bool: + return "List" in parameter_type and "CommaDelimitedList" not in parameter_type + + def _build_schema(self, instance: Any) -> dict[str, Any]: + if not isinstance(instance, dict): + return {} + + schema: dict[str, Any] = { + "properties": {}, + "additionalProperties": False, + "required": [], + "type": "object", + } + + singular_types = ["string", "integer", "number", "boolean"] + + for parameter_name, parameter_object in instance.items(): + schema["properties"][parameter_name] = {} + if not isinstance(parameter_object, dict): + continue + if "Default" not in parameter_object: + schema["required"] = [parameter_name] + + parameter_type = parameter_object.get("Type") + if not isinstance(parameter_type, str): + continue + + if self._is_type_a_list(parameter_type): + schema["properties"][parameter_name] = { + "type": "array", + "items": { + "type": singular_types, + }, + } + if "AllowedValues" in parameter_object: + schema["properties"][parameter_name]["items"]["enum"] = ( + parameter_object["AllowedValues"] + ) + if "Pattern" in parameter_object: + if self._is_type_a_list(parameter_type): + schema["properties"][parameter_name]["items"]["pattern"] = ( + parameter_object["Pattern"] + ) + else: + schema["properties"][parameter_name]["type"] = singular_types + if "AllowedValues" in parameter_object: + schema["properties"][parameter_name]["enum"] = parameter_object[ + "AllowedValues" + ] + if "Pattern" in parameter_object: + schema["properties"][parameter_name]["pattern"] = parameter_object[ + "Pattern" + ] + + return schema + + def validate(self, validator: Validator, _: Any, instance: Any, schema: Any): + if not validator.cfn.parameters: + return + + cfn_validator = self.extend_validator( + validator=validator, + schema=self._build_schema(instance), + context=validator.context, + ).evolve( + context=validator.context.evolve(strict_types=False), + function_filter=validator.function_filter.evolve( + add_cfn_lint_keyword=False, + ), + ) + + for err in super()._iter_errors(cfn_validator, validator.cfn.parameters): + # we use enum twice. Once for the type and once for the property + # names. There are separate error numbers so we do this. + if "propertyNames" in err.schema_path and "enum" in err.schema_path: + err.rule = self + yield err diff --git a/src/cfnlint/runner/__init__.py b/src/cfnlint/runner/__init__.py index 543911127a..cc46b961ef 100644 --- a/src/cfnlint/runner/__init__.py +++ b/src/cfnlint/runner/__init__.py @@ -3,7 +3,6 @@ SPDX-License-Identifier: MIT-0 """ -__all__ = ["main", "Runner", "TemplateRunner"] +__all__ = ["main", "Runner"] from cfnlint.runner.cli import Runner, main -from cfnlint.runner.template import TemplateRunner diff --git a/src/cfnlint/runner/cli.py b/src/cfnlint/runner/cli.py index e5ffbc6ac7..fa3fdc485b 100644 --- a/src/cfnlint/runner/cli.py +++ b/src/cfnlint/runner/cli.py @@ -11,13 +11,12 @@ import cfnlint.formatters import cfnlint.maintenance from cfnlint.config import ConfigMixIn, configure_logging -from cfnlint.decode.decode import decode from cfnlint.rules import Match, Rules from cfnlint.rules.errors import ConfigError, ParseError +from cfnlint.runner.deployment_file.runner import run_deployment_files from cfnlint.runner.exceptions import CfnLintExitException, UnexpectedRuleException -from cfnlint.runner.template import TemplateRunner +from cfnlint.runner.template import run_template_by_data, run_template_by_file_path from cfnlint.schema import PROVIDER_SCHEMA_MANAGER -from cfnlint.runner.deployment_files.runner import run_deployment_files LOGGER = logging.getLogger(__name__) @@ -119,7 +118,6 @@ def _get_rules(self) -> None: self.rules.update(Rules.create_from_directory(rules_path)) else: self.rules.update(Rules.create_from_module(rules_path)) - self.rules.update( Rules.create_from_custom_rules_file(self.config.custom_rules) ) @@ -159,30 +157,20 @@ def _validate_filenames(self, filenames: Sequence[str | None]) -> Iterator[Match ): ignore_bad_template = True for filename in filenames: - (template, matches) = decode(filename) - if matches: - if ignore_bad_template or any( - "E0000".startswith(x) for x in self.config.ignore_checks - ): - matches = [match for match in matches if match.rule.id != "E0000"] - - yield from iter(matches) - continue - yield from self.validate_template(filename, template) # type: ignore[arg-type] # noqa: E501 - - def validate_template( - self, filename: str | None, template: dict[str, Any] - ) -> Iterator[Match]: + yield from run_template_by_file_path( + filename, self.config, self.rules, ignore_bad_template + ) + + def validate_template(self, template: dict[str, Any]) -> Iterator[Match]: """ Validate a single CloudFormation template and yield any matches found. - This function takes a CloudFormation template as a dictionary and runs the - configured rules against it. Any matches found are yielded as an iterator. + This function decodes the provided template, validates it against the + configured rules, and yields any matches found as an iterator. Args: - filename (str | None): The filename of the CloudFormation template, or - `None` if the template is not associated with a file. - template (dict[str, Any]): The CloudFormation template as a dictionary. + filename (str | None): The filename of the template being validated. + template (dict[str, Any]): The CloudFormation template to be validated. Yields: Match: The matches found during the validation process. @@ -190,8 +178,7 @@ def validate_template( Raises: None: This function does not raise any exceptions. """ - runner = TemplateRunner(filename, template, self.config, self.rules) - yield from runner.run() + yield from run_template_by_data(template, self.config, self.rules) def _cli_output(self, matches: list[Match]) -> None: formatter = get_formatter(self.config) @@ -269,7 +256,7 @@ def run(self) -> Iterator[Match]: yield from self._validate_filenames(self.config.templates) return - yield from run_deployment_files(self.config) + yield from run_deployment_files(self.config, self.rules) def cli(self) -> None: """ diff --git a/src/cfnlint/runner/deployment_files/__init__.py b/src/cfnlint/runner/deployment_file/__init__.py similarity index 100% rename from src/cfnlint/runner/deployment_files/__init__.py rename to src/cfnlint/runner/deployment_file/__init__.py diff --git a/src/cfnlint/runner/deployment_file/deployment.py b/src/cfnlint/runner/deployment_file/deployment.py new file mode 100644 index 0000000000..38a358bc5b --- /dev/null +++ b/src/cfnlint/runner/deployment_file/deployment.py @@ -0,0 +1,15 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class Deployment: + + template_file_path: str = field() + parameters: dict[str, Any] = field(default_factory=dict) + tags: dict[str, str] = field(default_factory=dict) diff --git a/src/cfnlint/runner/deployment_file/deployment_types/__init__.py b/src/cfnlint/runner/deployment_file/deployment_types/__init__.py new file mode 100644 index 0000000000..eeb4892028 --- /dev/null +++ b/src/cfnlint/runner/deployment_file/deployment_types/__init__.py @@ -0,0 +1,10 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +__all__ = ["create_deployment_from_git_sync"] + +from cfnlint.runner.deployment_file.deployment_types.git_sync import ( + create_deployment_from_git_sync, +) diff --git a/src/cfnlint/runner/deployment_file/deployment_types/git_sync.py b/src/cfnlint/runner/deployment_file/deployment_types/git_sync.py new file mode 100644 index 0000000000..cd3ed61a3a --- /dev/null +++ b/src/cfnlint/runner/deployment_file/deployment_types/git_sync.py @@ -0,0 +1,20 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from typing import Any + +from cfnlint.runner.deployment_file.deployment import Deployment + + +def create_deployment_from_git_sync(data: dict[str, Any]) -> Deployment: + + template_file_path = data.get("template-file-path") + if not template_file_path: + raise ValueError("template-file-path is required") + parameters = data.get("parameters", {}) + tags = data.get("tags", {}) + return Deployment( + template_file_path=template_file_path, parameters=parameters, tags=tags + ) diff --git a/src/cfnlint/runner/deployment_file/runner.py b/src/cfnlint/runner/deployment_file/runner.py new file mode 100644 index 0000000000..bbc5064416 --- /dev/null +++ b/src/cfnlint/runner/deployment_file/runner.py @@ -0,0 +1,84 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +import logging +from copy import deepcopy +from pathlib import Path +from typing import Iterator + +import cfnlint.runner.deployment_file.deployment_types +from cfnlint.config import ConfigMixIn +from cfnlint.decode import decode +from cfnlint.rules import Match, Rules +from cfnlint.runner.exceptions import CfnLintExitException +from cfnlint.runner.template import run_template_by_file_path + +LOGGER = logging.getLogger(__name__) + + +def run_deployment_file( + filename: str, config: ConfigMixIn, rules: Rules +) -> Iterator[Match]: + """ + Run a single deployment file specified in the configuration. + + Args: + filename (str): The filename of the deployment file to be run. + config (ConfigMixIn): The configuration object containing + settings for the deployment file scan. + + Yields: + + """ + + data, matches = decode(filename) + + if matches: + yield from iter(matches) + return + + ignore_bad_template: bool = False + if config.ignore_bad_template: + ignore_bad_template = True + + for plugin in cfnlint.runner.deployment_file.deployment_types.__all__: + try: + deployment_data = getattr( + cfnlint.runner.deployment_file.deployment_types, plugin + )(data) + template_path = Path(filename).parent / deployment_data.template_file_path + template_config = deepcopy(config) + template_config.template_parameters = deployment_data.parameters + + yield from run_template_by_file_path( + template_path, template_config, rules, ignore_bad_template + ) + return + except Exception as e: + LOGGER.info(e) + continue + + raise CfnLintExitException( + f"Deployment file {filename} didn't meet any supported deployment file format", + 1, + ) + + +def run_deployment_files(config: ConfigMixIn, rules: Rules) -> Iterator[Match]: + """ + Run the deployment files specified in the configuration. + + Args: + config (ConfigMixIn): The configuration object containing + settings for the deployment file scan. + + Yields: + + """ + + for deployment_file in config.deployment_files: + yield from run_deployment_file(deployment_file, deepcopy(config), rules) diff --git a/src/cfnlint/runner/deployment_files/deployment_types/__init__.py b/src/cfnlint/runner/deployment_files/deployment_types/__init__.py deleted file mode 100644 index 8595455928..0000000000 --- a/src/cfnlint/runner/deployment_files/deployment_types/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -""" -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -__all__ = ["GitSync"] - -from cfnlint.runner.deployment_files.deployment_types.git_sync import GitSync diff --git a/src/cfnlint/runner/deployment_files/deployment_types/git_sync.py b/src/cfnlint/runner/deployment_files/deployment_types/git_sync.py deleted file mode 100644 index 4c56fbf294..0000000000 --- a/src/cfnlint/runner/deployment_files/deployment_types/git_sync.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from dataclasses import dataclass, field -from typing import Any - -@dataclass(frozen=True) -class GitSync: - - template_file_path: str = field() - parameters: dict[str, Any] = field(default_factory=dict) - tags: dict[str, str] = field(default_factory=dict) - - @classmethod - def create_from_dict(cls, data: dict[str, Any]) -> "GitSync": - - template_file_path = data.get("template-file-path") - if not template_file_path: - raise ValueError("template-file-path is required") - parameters = data.get("parameters", {}) - tags = data.get("tags", {}) - return cls(template_file_path=template_file_path, parameters=parameters, tags=tags) diff --git a/src/cfnlint/runner/deployment_files/protocols.py b/src/cfnlint/runner/deployment_files/protocols.py deleted file mode 100644 index f1ed22e43f..0000000000 --- a/src/cfnlint/runner/deployment_files/protocols.py +++ /dev/null @@ -1,25 +0,0 @@ -""" -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque -from collections.abc import Mapping -from typing import TYPE_CHECKING, Any, ClassVar, Protocol, Type, runtime_checkable - -from cfnlint.jsonschema._filter import FunctionFilter - -if TYPE_CHECKING: - from cfnlint.context import Context - - -@runtime_checkable -class DeploymentFilePlugin(Protocol): - """ - The protocol to which all deployment file plugin classes adhere. - - Arguments: - - """ diff --git a/src/cfnlint/runner/deployment_files/runner.py b/src/cfnlint/runner/deployment_files/runner.py deleted file mode 100644 index 37c2d59847..0000000000 --- a/src/cfnlint/runner/deployment_files/runner.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from copy import deepcopy -from typing import Iterator - -from cfnlint.config import ConfigMixIn -from cfnlint.rules import Match, Rules -import cfnlint.runner.deployment_files.deployment_types -from cfnlint.decode import decode -from cfnlint.runner.template import TemplateRunner - -def run_deployment_file(filename: str, config: ConfigMixIn) -> Iterator[Match]: - """ - Run a single deployment file specified in the configuration. - - Args: - filename (str): The filename of the deployment file to be run. - config (ConfigMixIn): The configuration object containing - settings for the deployment file scan. - - Yields: - - """ - - data = decode(filename) - - ignore_bad_template: bool = False - if config.ignore_bad_template: - ignore_bad_template = True - - for plugin in cfnlint.runner.deployment_files.deployment_types.__all__: - try: - deployment_data = getattr(cfnlint.runner.deployment_files.deployment_types, plugin)(data) - (template, matches) = decode(filename) - if matches: - if ignore_bad_template or any( - "E0000".startswith(x) for x in self.config.ignore_checks - ): - matches = [match for match in matches if match.rule.id != "E0000"] - - yield from iter(matches) - continue - except Exception as e: - print(e) - continue - - return - yield - - -def run_deployment_files(config: ConfigMixIn) -> Iterator[Match]: - """ - Run the deployment files specified in the configuration. - - Args: - config (ConfigMixIn): The configuration object containing - settings for the deployment file scan. - - Yields: - - """ - - ignore_bad_template: bool = False - if config.ignore_bad_template: - ignore_bad_template = True - - for deployment_file in config.deployment_files: - yield from run_deployment_file(deployment_file, deepcopy(config)) diff --git a/src/cfnlint/runner/template.py b/src/cfnlint/runner/template.py deleted file mode 100644 index e633dda42e..0000000000 --- a/src/cfnlint/runner/template.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -import logging -from copy import deepcopy -from typing import Any, Iterator - -from cfnlint.config import ConfigMixIn -from cfnlint.helpers import REGIONS -from cfnlint.rules import Match, Rules -from cfnlint.rules.errors import TransformError -from cfnlint.runner.exceptions import InvalidRegionException -from cfnlint.template.template import Template - -LOGGER = logging.getLogger(__name__) - - -class TemplateRunner: - """ - Runs a set of rules against a CloudFormation template. - - Attributes: - config (ConfigMixIn): The configuration object containing - settings for the template scan. - cfn (Template): The CloudFormation template object. - rules (Rules): The set of rules to be applied to the template. - - Methods: - _dedup(matches: Iterator[Match]) -> Iterator[Match]: - Deduplicate a sequence of matches. - run() -> Iterator[Match]: - Run the rules against the CloudFormation template and - yield the resulting matches. - check_metadata_directives(matches: Iterator[Match]) -> Iterator[Match]: - Filter matches based on metadata directives in the template. - - """ - - def __init__( - self, - filename: str | None, - template: dict[str, Any], - config: ConfigMixIn, - rules: Rules, - ) -> None: - """ - Initialize a new TemplateRunner instance. - - Args: - filename (str | None): The filename of the CloudFormation template. - template (dict[str, Any]): The CloudFormation template as a dictionary. - config (ConfigMixIn): The configuration object containing - settings for the template scan. - rules (Rules): The set of rules to be applied to the template. - """ - self.config = deepcopy(config) - self.config.set_template_args(template) - self.cfn = Template(filename, template, self.config.regions) - self.rules = rules - - def _dedup(self, matches: Iterator[Match]) -> Iterator[Match]: - """ - Deduplicate a sequence of matches. - - Args: - matches (Iterator[Match]): The sequence of matches to be deduplicated. - - Yields: - Match: The unique matches from the input sequence. - """ - seen: list[Match] = [] - for match in matches: - if match not in seen: - seen.append(match) - yield match - - def run(self) -> Iterator[Match]: - """ - Run the rules against the CloudFormation template and - yield the resulting matches. - - Yields: - Match: The matches found by running the rules against the template. - """ - LOGGER.info("Run scan of template %s", self.cfn.filename) - if not set(self.config.regions).issubset(set(REGIONS)): - unsupported_regions = list( - set(self.config.regions).difference(set(REGIONS)) - ) - raise InvalidRegionException( - ( - f"Regions {unsupported_regions!r} are unsupported. " - f"Supported regions are {REGIONS!r}" - ), - 32, - ) - - matches = self.cfn.transform() - if matches: - if self.rules.is_rule_enabled(TransformError(), self.config): - yield from iter(matches) - return - - if self.cfn.template is not None: - if self.config.build_graph: - self.cfn.build_graph() - yield from self._dedup( - self.check_metadata_directives( - self.rules.run( - filename=self.cfn.filename, cfn=self.cfn, config=self.config - ) - ) - ) - - def check_metadata_directives(self, matches: Iterator[Match]) -> Iterator[Match]: - """ - Filter matches based on metadata directives in the template. - - Args: - matches (Iterator[Match]): The sequence of matches to be filtered. - - Yields: - Match: The matches that are not suppressed by metadata directives. - """ - directives = self.cfn.get_directives() - - for match in matches: - if match.rule.id not in directives: - yield match - else: - for mandatory_check in self.config.mandatory_checks: - if match.rule.id.startswith(mandatory_check): - yield match - break - else: - path = getattr(match, "path", None) - if path: - if len(path) >= 2: - if path[0] != "Resources": - yield match - continue - if path[1] not in directives[match.rule.id]: - yield match - else: - yield match diff --git a/src/cfnlint/runner/template/__init__.py b/src/cfnlint/runner/template/__init__.py new file mode 100644 index 0000000000..3ec5ca82fa --- /dev/null +++ b/src/cfnlint/runner/template/__init__.py @@ -0,0 +1,11 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +__all__ = ["run_template_by_file_path", "run_template_by_data"] + +from cfnlint.runner.template.runner import ( + run_template_by_data, + run_template_by_file_path, +) diff --git a/src/cfnlint/runner/template/runner.py b/src/cfnlint/runner/template/runner.py new file mode 100644 index 0000000000..884543c460 --- /dev/null +++ b/src/cfnlint/runner/template/runner.py @@ -0,0 +1,148 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +import logging +from typing import Any, Iterator + +from cfnlint.config import ConfigMixIn +from cfnlint.decode import decode +from cfnlint.helpers import REGIONS +from cfnlint.rules import Match, Rules +from cfnlint.rules.errors import TransformError +from cfnlint.runner.exceptions import InvalidRegionException +from cfnlint.template.template import Template + +LOGGER = logging.getLogger(__name__) + + +def _dedup(matches: Iterator[Match]) -> Iterator[Match]: + """ + Deduplicate a sequence of matches. + + Args: + matches (Iterator[Match]): The sequence of matches to be deduplicated. + + Yields: + Match: The unique matches from the input sequence. + """ + seen: list[Match] = [] + for match in matches: + if match not in seen: + seen.append(match) + yield match + + +def _check_metadata_directives( + matches: Iterator[Match], cfn: Template, config: ConfigMixIn +) -> Iterator[Match]: + """ + Filter matches based on metadata directives in the template. + + Args: + matches (Iterator[Match]): The sequence of matches to be filtered. + + Yields: + Match: The matches that are not suppressed by metadata directives. + """ + directives = cfn.get_directives() + + for match in matches: + if match.rule.id not in directives: + yield match + else: + for mandatory_check in config.mandatory_checks: + if match.rule.id.startswith(mandatory_check): + yield match + break + else: + path = getattr(match, "path", None) + if path: + if len(path) >= 2: + if path[0] != "Resources": + yield match + continue + if path[1] not in directives[match.rule.id]: + yield match + else: + yield match + + +def _run_template( + filename: str | None, template: Any, config: ConfigMixIn, rules: Rules +) -> Iterator[Match]: + + config.set_template_args(template) + cfn = Template(filename, template, config.regions, config.template_parameters) + + LOGGER.info("Run scan of template %s", cfn.filename) + if not set(config.regions).issubset(set(REGIONS)): + unsupported_regions = list(set(config.regions).difference(set(REGIONS))) + raise InvalidRegionException( + ( + f"Regions {unsupported_regions!r} are unsupported. " + f"Supported regions are {REGIONS!r}" + ), + 32, + ) + + matches = cfn.transform() + if matches: + if rules.is_rule_enabled(TransformError(), config): + yield from iter(matches) + return + + if cfn.template is not None: + if config.build_graph: + cfn.build_graph() + yield from _dedup( + _check_metadata_directives( + rules.run(filename=cfn.filename, cfn=cfn, config=config), + cfn=cfn, + config=config, + ) + ) + + +def run_template_by_file_path( + filename: str | None, config: ConfigMixIn, rules: Rules, ignore_bad_template: bool +) -> Iterator[Match]: + """ + Runs a set of rules against a CloudFormation template. + + Attributes: + config (ConfigMixIn): The configuration object containing + settings for the template scan. + cfn (Template): The CloudFormation template object. + rules (Rules): The set of rules to be applied to the template. + """ + + (template, matches) = decode(filename) # type: ignore + if matches: + if ignore_bad_template or any( + "E0000".startswith(x) for x in config.ignore_checks + ): + matches = [match for match in matches if match.rule.id != "E0000"] + + yield from iter(matches) + return + yield from _run_template(filename, template, config, rules) + + +def run_template_by_data( + template: dict[str, Any], config: ConfigMixIn, rules: Rules +) -> Iterator[Match]: + """ + Runs a set of rules against a CloudFormation template. + + Attributes: + config (ConfigMixIn): The configuration object containing + settings for the template scan. + cfn (Template): The CloudFormation template object. + rules (Rules): The set of rules to be applied to the template. + """ + + yield from _run_template(None, template, config, rules) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index e7a02fe056..a570d881fa 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -51,6 +51,7 @@ def __init__( filename: str | None, template: dict[str, Any], regions: list[str] | None = None, + parameters: dict[str, Any] | None = None, ): """Initialize a Template instance. @@ -63,6 +64,7 @@ def __init__( self.regions = [cfnlint.helpers.REGION_PRIMARY] else: self.regions = regions + self.parameters = parameters or {} self.filename = filename self.template = template self.transform_pre: dict[str, Any] = {} diff --git a/test/unit/module/custom_rules/test_custom_rules.py b/test/unit/module/custom_rules/test_custom_rules.py index 4a863a9bd4..41d84e29db 100644 --- a/test/unit/module/custom_rules/test_custom_rules.py +++ b/test/unit/module/custom_rules/test_custom_rules.py @@ -9,7 +9,7 @@ from cfnlint import ConfigMixIn from cfnlint.config import _DEFAULT_RULESDIR from cfnlint.rules import Rules -from cfnlint.runner import TemplateRunner +from cfnlint.runner.template.runner import _run_template class TestCustomRuleParsing(BaseTestCase): @@ -142,5 +142,4 @@ def run_tests(self, rulename): template = cfnlint.decode.cfn_yaml.load(filename) rules = Rules() rules.update(rules.create_from_custom_rules_file(rulename)) - runner = TemplateRunner(filename, template, ConfigMixIn({}), rules) - return list(runner.run()) + return list(_run_template(filename, template, ConfigMixIn({}), rules)) diff --git a/test/unit/module/rule/test_rule_child.py b/test/unit/module/rule/test_rule_child.py index 1c2f448d42..f254940863 100644 --- a/test/unit/module/rule/test_rule_child.py +++ b/test/unit/module/rule/test_rule_child.py @@ -9,7 +9,7 @@ from cfnlint import ConfigMixIn from cfnlint.decode.decode import decode_str from cfnlint.rules import CloudFormationLintRule, Match, RuleMatch, Rules -from cfnlint.runner import TemplateRunner +from cfnlint.runner.template import run_template_by_data class TestCloudFormationRuleChild(BaseTestCase): @@ -50,8 +50,9 @@ def failure(self): template, _ = decode_str('{"key": "value"}') self.assertIsNotNone(template) if template is not None: - runner = TemplateRunner(None, template, ConfigMixIn([]), rule_collection) - failures = list(runner.run()) + failures = list( + run_template_by_data(template, ConfigMixIn([]), rule_collection) + ) self.assertListEqual( failures, @@ -105,13 +106,13 @@ def failure(self): template, _ = decode_str('{"key": "value"}') self.assertIsNotNone(template) if template is not None: - runner = TemplateRunner( - None, - template, - ConfigMixIn(ignore_checks=["E1001"]), - rule_collection, + failures = list( + run_template_by_data( + template, + ConfigMixIn(ignore_checks=["E1001"]), + rule_collection, + ) ) - failures = list(runner.run()) self.assertListEqual(failures, []) @@ -158,12 +159,12 @@ def failure(self): template, _ = decode_str('{"key": "value"}') self.assertIsNotNone(template) if template is not None: - runner = TemplateRunner( - None, - template, - ConfigMixIn(configure_rules={"E1001": {"pass": False}}), - rule_collection, + failures = list( + run_template_by_data( + template, + ConfigMixIn(configure_rules={"E1001": {"pass": False}}), + rule_collection, + ) ) - failures = list(runner.run()) self.assertListEqual(failures, []) diff --git a/test/unit/module/runner/test_get_formatter.py b/test/unit/module/runner/test_get_formatter.py index 603ae637f0..4a93d04856 100644 --- a/test/unit/module/runner/test_get_formatter.py +++ b/test/unit/module/runner/test_get_formatter.py @@ -7,7 +7,7 @@ import cfnlint.formatters from cfnlint import ConfigMixIn -from cfnlint.runner import get_formatter +from cfnlint.runner.cli import get_formatter class TestGetFormatter(BaseTestCase): diff --git a/test/unit/module/runner/test_rule_configuration.py b/test/unit/module/runner/test_rule_configuration.py index 444ad341e1..ee01c159ad 100644 --- a/test/unit/module/runner/test_rule_configuration.py +++ b/test/unit/module/runner/test_rule_configuration.py @@ -7,7 +7,8 @@ from test.testlib.testcase import BaseTestCase from cfnlint.config import ConfigMixIn -from cfnlint.runner import Runner, UnexpectedRuleException +from cfnlint.runner import Runner +from cfnlint.runner.exceptions import UnexpectedRuleException class TestGetRules(BaseTestCase): diff --git a/test/unit/module/runner/test_runner.py b/test/unit/module/runner/test_runner.py index 185e9c161c..65faab8fdb 100644 --- a/test/unit/module/runner/test_runner.py +++ b/test/unit/module/runner/test_runner.py @@ -9,8 +9,8 @@ import pytest from cfnlint.config import ConfigMixIn -from cfnlint.runner import PROVIDER_SCHEMA_MANAGER, Runner -from cfnlint.schema import Schema +from cfnlint.runner import Runner +from cfnlint.schema import PROVIDER_SCHEMA_MANAGER, Schema def patch_registry(path): diff --git a/test/unit/module/runner/test_template_runner.py b/test/unit/module/runner/test_template_runner.py index d3d599239e..4b3fdab4c0 100644 --- a/test/unit/module/runner/test_template_runner.py +++ b/test/unit/module/runner/test_template_runner.py @@ -10,7 +10,7 @@ from cfnlint import ConfigMixIn from cfnlint.config import _DEFAULT_RULESDIR from cfnlint.rules import Rules -from cfnlint.runner import TemplateRunner +from cfnlint.runner.template import run_template_by_data class TestRunner(BaseTestCase): @@ -68,45 +68,45 @@ def setUp(self): def test_runner(self): """Success test""" - runner = TemplateRunner( - filename=None, - template=self.template, - config=ConfigMixIn( - regions=["us-east-1"], - include_checks=["I"], - include_experimental=True, - ), - rules=self.rules, + failures = list( + run_template_by_data( + template=self.template, + config=ConfigMixIn( + regions=["us-east-1"], + include_checks=["I"], + include_experimental=True, + ), + rules=self.rules, + ) ) - failures = list(runner.run()) self.assertEqual(len(failures), 4, "Got failures {}".format(failures)) def test_runner_mandatory_rules(self): """Success test""" - runner = TemplateRunner( - filename=None, - template=self.template, - config=ConfigMixIn( - mandatory_checks=["W1020"], - regions=["us-east-1"], - include_checks=["I"], - include_experimental=True, - ), - rules=self.rules, + failures = list( + run_template_by_data( + template=self.template, + config=ConfigMixIn( + mandatory_checks=["W1020"], + regions=["us-east-1"], + include_checks=["I"], + include_experimental=True, + ), + rules=self.rules, + ) ) - failures = list(runner.run()) self.assertEqual(len(failures), 5, "Got failures {}".format(failures)) - runner = TemplateRunner( - filename=None, - template=self.template, - config=ConfigMixIn( - mandatory_checks=["W9000"], - regions=["us-east-1"], - include_checks=["I"], - include_experimental=True, - ), - rules=self.rules, + failures = list( + run_template_by_data( + template=self.template, + config=ConfigMixIn( + mandatory_checks=["W9000"], + regions=["us-east-1"], + include_checks=["I"], + include_experimental=True, + ), + rules=self.rules, + ) ) - failures = list(runner.run()) self.assertEqual(len(failures), 4, "Got failures {}".format(failures)) diff --git a/test/unit/rules/__init__.py b/test/unit/rules/__init__.py index ba28f38cb9..b5f0d74aff 100644 --- a/test/unit/rules/__init__.py +++ b/test/unit/rules/__init__.py @@ -8,7 +8,7 @@ from cfnlint import ConfigMixIn, Rules from cfnlint.config import ManualArgs -from cfnlint.runner import TemplateRunner +from cfnlint.runner.template.runner import _run_template class BaseRuleTestCase(BaseTestCase): @@ -29,8 +29,7 @@ def helper_file_positive(self, config=None): config = config or self.config for filename in self.success_templates: template = self.load_template(filename) - good_runner = TemplateRunner(filename, template, config, self.collection) - failures = list(good_runner.run()) + failures = list(_run_template(filename, template, config, self.collection)) self.assertEqual( [], failures, "Got failures {} on {}".format(failures, filename) ) @@ -39,8 +38,7 @@ def helper_file_positive_template(self, filename, config=None): """Success test with template parameter""" config = config or self.config template = self.load_template(filename) - good_runner = TemplateRunner(filename, template, config, self.collection) - failures = list(good_runner.run()) + failures = list(_run_template(filename, template, config, self.collection)) self.assertEqual( [], failures, @@ -51,8 +49,7 @@ def helper_file_negative(self, filename, err_count, config=None): """Failure test""" config = config or self.config template = self.load_template(filename) - bad_runner = TemplateRunner(filename, template, config, self.collection) - failures = list(bad_runner.run()) + failures = list(_run_template(filename, template, config, self.collection)) self.assertEqual( err_count, len(failures),