diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 04a08cfa64..030035f2ac 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -285,6 +285,10 @@ compactor: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] + # The compaction strategy to use. Supported values are: default, partitioning. + # CLI flag: -compactor.compaction-mode + [compaction_mode: | default = "default"] + # How long block visit marker file should be considered as expired and able to # be picked up by compactor again. # CLI flag: -compactor.block-visit-marker-timeout diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b0330fd454..0144dbaf31 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2216,6 +2216,10 @@ sharding_ring: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] +# The compaction strategy to use. Supported values are: default, partitioning. +# CLI flag: -compactor.compaction-mode +[compaction_mode: | default = "default"] + # How long block visit marker file should be considered as expired and able to # be picked up by compactor again. # CLI flag: -compactor.block-visit-marker-timeout diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index ff7907fe2c..817f93572b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -53,9 +53,12 @@ var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + supportedCompactionStrategies = []string{util.CompactionStrategyDefault, util.CompactionStrategyPartitioning} + errInvalidCompactionStrategy = errors.New("invalid compaction strategy") + errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled") DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouperWithMetrics( @@ -77,29 +80,33 @@ var ( } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { - return NewShuffleShardingGrouper( - ctx, - logger, - bkt, - cfg.AcceptMalformedIndex, - true, // Enable vertical compaction - blocksMarkedForNoCompaction, - metadata.NoneFunc, - syncerMetrics, - compactorMetrics, - cfg, - ring, - ringLifecycle.Addr, - ringLifecycle.ID, - limits, - userID, - cfg.BlockFilesConcurrency, - cfg.BlocksFetchConcurrency, - cfg.CompactionConcurrency, - cfg.BlockVisitMarkerTimeout, - blockVisitMarkerReadFailed, - blockVisitMarkerWriteFailed, - noCompactionMarkFilter.NoCompactMarkedBlocks) + if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + return NewPartitionCompactionGrouper(ctx, logger, bkt) + } else { + return NewShuffleShardingGrouper( + ctx, + logger, + bkt, + cfg.AcceptMalformedIndex, + true, // Enable vertical compaction + blocksMarkedForNoCompaction, + metadata.NoneFunc, + syncerMetrics, + compactorMetrics, + cfg, + ring, + ringLifecycle.Addr, + ringLifecycle.ID, + limits, + userID, + cfg.BlockFilesConcurrency, + cfg.BlocksFetchConcurrency, + cfg.CompactionConcurrency, + cfg.BlockVisitMarkerTimeout, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, + noCompactionMarkFilter.NoCompactMarkedBlocks) + } } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -123,7 +130,11 @@ var ( plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { - return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) + if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + return NewPartitionCompactionPlanner(ctx, bkt, logger) + } else { + return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) + } } return compactor, plannerFactory, nil } @@ -202,6 +213,9 @@ type Config struct { ShardingStrategy string `yaml:"sharding_strategy"` ShardingRing RingConfig `yaml:"sharding_ring"` + // Compaction mode. + CompactionStrategy string `yaml:"compaction_mode"` + // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override // it in tests. @@ -244,6 +258,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) + f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -290,6 +305,15 @@ func (cfg *Config) Validate(limits validation.Limits) error { } } + // Make sure a valid compaction mode is being used + if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) { + return errInvalidCompactionStrategy + } + + if !cfg.ShardingEnabled && cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + return errInvalidCompactionStrategyPartitioning + } + return nil } diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go new file mode 100644 index 0000000000..c3687f7e6a --- /dev/null +++ b/pkg/compactor/partition_compaction_grouper.go @@ -0,0 +1,38 @@ +package compactor + +import ( + "context" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" +) + +type PartitionCompactionGrouper struct { + ctx context.Context + logger log.Logger + bkt objstore.InstrumentedBucket +} + +func NewPartitionCompactionGrouper( + ctx context.Context, + logger log.Logger, + bkt objstore.InstrumentedBucket, +) *PartitionCompactionGrouper { + if logger == nil { + logger = log.NewNopLogger() + } + + return &PartitionCompactionGrouper{ + ctx: ctx, + logger: logger, + bkt: bkt, + } +} + +// Groups function modified from https://github.com/cortexproject/cortex/pull/2616 +func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + panic("PartitionCompactionGrouper not implemented") +} diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go new file mode 100644 index 0000000000..963771aa6d --- /dev/null +++ b/pkg/compactor/partition_compaction_planner.go @@ -0,0 +1,31 @@ +package compactor + +import ( + "context" + + "github.com/go-kit/log" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type PartitionCompactionPlanner struct { + ctx context.Context + bkt objstore.InstrumentedBucket + logger log.Logger +} + +func NewPartitionCompactionPlanner( + ctx context.Context, + bkt objstore.InstrumentedBucket, + logger log.Logger, +) *PartitionCompactionPlanner { + return &PartitionCompactionPlanner{ + ctx: ctx, + bkt: bkt, + logger: logger, + } +} + +func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { + panic("PartitionCompactionPlanner not implemented") +} diff --git a/pkg/util/shard.go b/pkg/util/shard.go index 82392b3a1a..5d3de01cc4 100644 --- a/pkg/util/shard.go +++ b/pkg/util/shard.go @@ -10,6 +10,10 @@ const ( // Sharding strategies. ShardingStrategyDefault = "default" ShardingStrategyShuffle = "shuffle-sharding" + + // Compaction mode + CompactionStrategyDefault = "default" + CompactionStrategyPartitioning = "partitioning" ) var (