Skip to content

Commit

Permalink
DENA-671: refactor: extracted test cases from func & split them into …
Browse files Browse the repository at this point in the history
…multiple scenarios (#26)

* refactor: extracted test cases from func & split them into multiple scenarios

* DENA-671: Compacted topics validations (#27)
  • Loading branch information
sbuliarca authored Oct 28, 2024
1 parent 608b8d2 commit 1d5969a
Show file tree
Hide file tree
Showing 3 changed files with 561 additions and 330 deletions.
108 changes: 88 additions & 20 deletions rules/msk_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,12 @@ func (r *MSKTopicConfigRule) validateTopicConfig(runner tflint.Runner, topic *hc
return err
}

configAttr, hasConfig := topic.Body.Attributes["config"]
if !hasConfig {
err := runner.EmitIssue(
r,
"missing config attribute: the topic configuration must be specified in a config attribute",
topic.DefRange,
)
if err != nil {
return fmt.Errorf("emitting issue: missing config block: %w", err)
}
configAttr, err := r.validateAndGetConfigAttr(runner, topic)
if err != nil {
return err
}

if configAttr == nil {
return nil
}

Expand All @@ -97,21 +93,61 @@ func (r *MSKTopicConfigRule) validateTopicConfig(runner tflint.Runner, topic *hc
return err
}

cleanupPolicy, err := r.getAndValidateCleanupPolicy(runner, configAttr, configKeyToPairMap)
if err = r.validateCleanupPolicyConfig(runner, configAttr, configKeyToPairMap); err != nil {
return err
}
return nil
}

func (r *MSKTopicConfigRule) validateCleanupPolicyConfig(
runner tflint.Runner,
configAttr *hclext.Attribute,
configKeyToPairMap map[string]hcl.KeyValuePair,
) error {
cleanupPolicy, err := r.getAndValidateCleanupPolicyValue(runner, configAttr, configKeyToPairMap)
if err != nil {
return err
}

switch cleanupPolicy {
case cleanupPolicyDelete:
if err := r.validateRetentionForDeletePolicy(runner, configAttr, configKeyToPairMap); err != nil {
return err
}
case cleanupPolicyCompact:
// todo: validate no retention & remote storage for compact
reason := "compacted topic"
if err := r.validateTieredStorageDisabled(runner, configKeyToPairMap, reason); err != nil {
return err
}
if err := r.validateLocalRetentionNotDefined(runner, configKeyToPairMap, reason); err != nil {
return err
}
if err := r.validateRetentionTimeNotDefined(runner, configKeyToPairMap, reason); err != nil {
return err
}
}
return nil
}

func (r *MSKTopicConfigRule) validateAndGetConfigAttr(
runner tflint.Runner,
topic *hclext.Block,
) (*hclext.Attribute, error) {
configAttr, hasConfig := topic.Body.Attributes["config"]
if !hasConfig {
err := runner.EmitIssue(
r,
"missing config attribute: the topic configuration must be specified in a config attribute",
topic.DefRange,
)
if err != nil {
return nil, fmt.Errorf("emitting issue: missing config block: %w", err)
}
return nil, nil
}
return configAttr, nil
}

func constructConfigKeyToPairMap(configAttr *hclext.Attribute) (map[string]hcl.KeyValuePair, error) {
configExpr, ok := configAttr.Expr.(*hclsyntax.ObjectConsExpr)
if !ok {
Expand Down Expand Up @@ -258,7 +294,7 @@ var (
cleanupPolicyValidValues = []string{cleanupPolicyDelete, cleanupPolicyCompact}
)

func (r *MSKTopicConfigRule) getAndValidateCleanupPolicy(
func (r *MSKTopicConfigRule) getAndValidateCleanupPolicyValue(
runner tflint.Runner,
config *hclext.Attribute,
configKeyToPairMap map[string]hcl.KeyValuePair,
Expand Down Expand Up @@ -349,11 +385,12 @@ func (r *MSKTopicConfigRule) validateRetentionForDeletePolicy(
return err
}
} else {
if err := r.validateTieredStorageNotEnabled(runner, configKeyToPairMap); err != nil {
reason := fmt.Sprintf("less than %d days retention", tieredStorageThresholdInDays)
if err := r.validateTieredStorageDisabled(runner, configKeyToPairMap, reason); err != nil {
return err
}

if err := r.validateLocalRetentionNotDefined(runner, configKeyToPairMap); err != nil {
if err := r.validateLocalRetentionNotDefined(runner, configKeyToPairMap, reason); err != nil {
return err
}
}
Expand Down Expand Up @@ -413,16 +450,17 @@ func (r *MSKTopicConfigRule) validateLocalRetentionDefined(
func (r *MSKTopicConfigRule) validateLocalRetentionNotDefined(
runner tflint.Runner,
configKeyToPairMap map[string]hcl.KeyValuePair,
reason string,
) error {
localRetTimePair, hasLocalRetTimeAttr := configKeyToPairMap[localRetentionTimeAttr]
if !hasLocalRetTimeAttr {
return nil
}

msg := fmt.Sprintf(
"defining %s is misleading when tiered storage is disabled due to less than %d days retention: removing it...",
"defining %s is misleading when tiered storage is disabled due to %s: removing it...",
localRetentionTimeAttr,
tieredStorageThresholdInDays,
reason,
)
err := runner.EmitIssueWithFix(r, msg, localRetTimePair.Value.Range(),
func(f tflint.Fixer) error {
Expand Down Expand Up @@ -490,9 +528,10 @@ func (r *MSKTopicConfigRule) validateTieredStorageEnabled(
return nil
}

func (r *MSKTopicConfigRule) validateTieredStorageNotEnabled(
func (r *MSKTopicConfigRule) validateTieredStorageDisabled(
runner tflint.Runner,
configKeyToPairMap map[string]hcl.KeyValuePair,
reason string,
) error {
tieredStoragePair, hasTieredStorageAttr := configKeyToPairMap[tieredStorageEnableAttr]

Expand All @@ -511,8 +550,8 @@ func (r *MSKTopicConfigRule) validateTieredStorageNotEnabled(
}

msg := fmt.Sprintf(
"tiered storage is not supported for less than %d days retention: disabling it...",
tieredStorageThresholdInDays,
"tiered storage is not supported for %s: disabling it...",
reason,
)
err := runner.EmitIssueWithFix(r, msg, tieredStoragePair.Value.Range(),
func(f tflint.Fixer) error {
Expand Down Expand Up @@ -573,3 +612,32 @@ func (r *MSKTopicConfigRule) getAndValidateRetentionTime(
}
return &retTimeIntVal, nil
}

func (r *MSKTopicConfigRule) validateRetentionTimeNotDefined(
runner tflint.Runner,
configKeyToPairMap map[string]hcl.KeyValuePair,
reason string,
) error {
retTimePair, hasRetTime := configKeyToPairMap[retentionTimeAttr]
if !hasRetTime {
return nil
}
msg := fmt.Sprintf("defining %s is misleading for %s: removing it...", retentionTimeAttr, reason)
keyRange := retTimePair.Key.Range()

err := runner.EmitIssueWithFix(r, msg, keyRange,
func(f tflint.Fixer) error {
return f.Remove(
hcl.Range{
Filename: keyRange.Filename,
Start: keyRange.Start,
End: retTimePair.Value.Range().End,
},
)
},
)
if err != nil {
return fmt.Errorf("emitting issue: retention time defined for compacted topic: %w", err)
}
return nil
}
25 changes: 25 additions & 0 deletions rules/msk_topic_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ When cleanup policy is 'delete':
- for a retention period less than 3 days, tiered storage must be disabled and the local.retention.ms parameter must not be defined.
See the [AWS docs](https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html#msk-tiered-storage-constraints).

When cleanup policy is 'compact':
- 'retention.ms' must not be specified in the config as it is misleading. It doesn't apply to compacted topics. See [definition](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#retention-ms)
- tiered storage must not be enabled as it is not supported for compacted topics. See [limitations](https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html#msk-tiered-storage-constraints).

## Example

### Good example
Expand Down Expand Up @@ -42,6 +46,16 @@ resource "kafka_topic" "good topic" {
"retention.ms" = "86400000"
}
}
# Good compacted topic
resource "kafka_topic" "good topic" {
name = "good_topic"
replication_factor = 3
config = {
"cleanup.policy" = "compact"
"compression.type" = "zstd"
}
}
```

### Bad examples
Expand Down Expand Up @@ -88,6 +102,17 @@ resource "kafka_topic" "topic_with_more_than_3_days_retention" {
"compression.type" = "zstd"
}
}
# compacted topic with tiered storage enabled
resource "kafka_topic" "topic_compacted_with_tiered_storage" {
name = "topic_compacted_with_tiered_storage"
replication_factor = 3
config = {
"remote.storage.enable" = "true"
"cleanup.policy" = "compact"
"compression.type" = "zstd"
}
}
```

## How To Fix
Expand Down
Loading

0 comments on commit 1d5969a

Please sign in to comment.