Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move segment manifest cache to fetch module #484

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -56,13 +54,14 @@
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
import io.aiven.kafka.tieredstorage.fetch.manifest.MemorySegmentManifestCache;
import io.aiven.kafka.tieredstorage.fetch.manifest.SegmentManifestCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule;
Expand Down Expand Up @@ -111,8 +110,6 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.

private Metrics metrics;

private final Executor executor = new ForkJoinPool();

private ObjectFetcher fetcher;
private ObjectUploader uploader;
private ObjectDeleter deleter;
Expand All @@ -128,7 +125,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private SegmentCustomMetadataSerde customMetadataSerde;
private Set<SegmentCustomMetadataField> customMetadataFields;

private SegmentManifestProvider segmentManifestProvider;
private SegmentManifestCache segmentManifestCache;
private SegmentIndexesCache segmentIndexesCache;

private Bucket rateLimitingBucket;
Expand Down Expand Up @@ -171,12 +168,8 @@ public void configure(final Map<String, ?> configs) {

mapper = getObjectMapper();

segmentManifestProvider = new SegmentManifestProvider(
config.segmentManifestCacheSize(),
config.segmentManifestCacheRetention(),
fetcher,
mapper,
executor);
segmentManifestCache = new MemorySegmentManifestCache(fetcher, mapper);
segmentManifestCache.configure(config.segmentManifestCacheConfigs());

segmentIndexesCache = new MemorySegmentIndexesCache();
segmentIndexesCache.configure(config.fetchIndexesCacheConfigs());
Expand All @@ -196,8 +189,8 @@ void setStorage(final StorageBackend storage) {
}

// for testing
void setSegmentManifestProvider(final SegmentManifestProvider segmentManifestProvider) {
this.segmentManifestProvider = segmentManifestProvider;
void setSegmentManifestCache(final MemorySegmentManifestCache segmentManifestCache) {
this.segmentManifestCache = segmentManifestCache;
}

// for testing
Expand Down Expand Up @@ -602,7 +595,7 @@ private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetad
private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackendException, IOException {
final ObjectKey manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
return segmentManifestProvider.get(manifestKey);
return segmentManifestCache.get(manifestKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class CacheConfig extends AbstractConfig {
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static ConfigDef configDef(final Object defaultSize) {
private static ConfigDef configDef(final Object defaultSize, final long defaultRetentionMs) {
final var configDef = new ConfigDef();
configDef.define(
CACHE_SIZE_CONFIG,
Expand All @@ -46,16 +46,20 @@ private static ConfigDef configDef(final Object defaultSize) {
configDef.define(
CACHE_RETENTION_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_CACHE_RETENTION_MS,
defaultRetentionMs,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
return configDef;
}

private CacheConfig(final Map<String, ?> props, final Object defaultSize) {
super(configDef(defaultSize), props);
private CacheConfig(
final Map<String, ?> props,
final Object defaultSize,
final long defaultRetentionMs
) {
super(configDef(defaultSize, defaultRetentionMs), props);
}

public static Builder newBuilder(final Map<String, ?> configs) {
Expand All @@ -80,6 +84,7 @@ public Optional<Duration> cacheRetention() {

public static class Builder {
private final Map<String, ?> props;
private long defaultRetentionMs = DEFAULT_CACHE_RETENTION_MS;
private Object maybeDefaultSize = NO_DEFAULT_VALUE;

public Builder(final Map<String, ?> props) {
Expand All @@ -91,8 +96,13 @@ public Builder withDefaultSize(final long defaultSize) {
return this;
}

public Builder withDefaultRetentionMs(final long retentionMs) {
this.defaultRetentionMs = retentionMs;
return this;
}

public CacheConfig build() {
return new CacheConfig(props, maybeDefaultSize);
return new CacheConfig(props, maybeDefaultSize, defaultRetentionMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.aiven.kafka.tieredstorage.config;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -55,18 +54,6 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String OBJECT_KEY_PREFIX_MASK_DOC = "Whether to mask path prefix in logs";

private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
"The size in items of the segment manifest cache. "
+ "Use -1 for \"unbounded\". The default is 1000.";

public static final String SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX
+ "retention.ms";
public static final long SEGMENT_MANIFEST_CACHE_RETENTION_MS_DEFAULT = 3_600_000; // 1 hour
private static final String SEGMENT_MANIFEST_CACHE_RETENTION_MS_DOC =
"The retention time for the segment manifest cache. "
+ "Use -1 for \"forever\". The default is 3_600_000 (1 hour).";

private static final String CHUNK_SIZE_CONFIG = "chunk.size";
private static final String CHUNK_SIZE_DOC = "The chunk size of log files";
Expand Down Expand Up @@ -134,23 +121,6 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
OBJECT_KEY_PREFIX_MASK_DOC
);

CONFIG.define(
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT,
ConfigDef.Range.atLeast(-1L),
ConfigDef.Importance.LOW,
SEGMENT_MANIFEST_CACHE_SIZE_DOC
);
CONFIG.define(
SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG,
ConfigDef.Type.LONG,
SEGMENT_MANIFEST_CACHE_RETENTION_MS_DEFAULT,
ConfigDef.Range.atLeast(-1L),
ConfigDef.Importance.LOW,
SEGMENT_MANIFEST_CACHE_RETENTION_MS_DOC
);

CONFIG.define(
CHUNK_SIZE_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -350,22 +320,6 @@ public StorageBackend storage() {
return storage;
}

public Optional<Long> segmentManifestCacheSize() {
final long rawValue = getLong(SEGMENT_MANIFEST_CACHE_SIZE_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(rawValue);
}

public Optional<Duration> segmentManifestCacheRetention() {
final long rawValue = getLong(SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(Duration.ofMillis(rawValue));
}

public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}
Expand Down Expand Up @@ -421,4 +375,8 @@ public Set<SegmentCustomMetadataField> customMetadataKeysIncluded() {
public Map<String, ?> fetchIndexesCacheConfigs() {
return originalsWithPrefix(FETCH_INDEXES_CACHE_PREFIX);
}

public Map<String, ?> segmentManifestCacheConfigs() {
return originalsWithPrefix(SEGMENT_MANIFEST_CACHE_PREFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class ChunkCache<T> implements ChunkManager, Configurable {
private static final String METRIC_GROUP = "chunk-cache-metrics";

private final ChunkManager chunkManager;
private final Executor executor = new ForkJoinPool();
private Executor executor;

final CaffeineStatsCounter statsCounter;

Expand Down Expand Up @@ -133,6 +133,7 @@ public InputStream getChunk(final ObjectKey objectKey,
public abstract Weigher<ChunkKey, T> weigher();

protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
this.executor = new ForkJoinPool();
this.prefetchingSize = config.cachePrefetchingSize();
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2023 Aiven Oy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not new, but moving+renaming SegmentManifestProvider

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.fetch.manifest;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemorySegmentManifestCache implements SegmentManifestCache {
private static final Logger log = LoggerFactory.getLogger(MemorySegmentManifestCache.class);
private static final String METRIC_GROUP = "segment-manifest-cache-metrics";
private static final long GET_TIMEOUT_SEC = 10;
private static final long DEFAULT_MAX_SIZE = 1000L;
private static final long DEFAULT_RETENTION_MS = 3_600_000;

private AsyncLoadingCache<ObjectKey, SegmentManifest> cache;
final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);

final ObjectFetcher fileFetcher;
final ObjectMapper mapper;

public MemorySegmentManifestCache(final ObjectFetcher fileFetcher, final ObjectMapper mapper) {
this.fileFetcher = fileFetcher;
this.mapper = mapper;
}

public SegmentManifest get(final ObjectKey manifestKey)
throws StorageBackendException, IOException {
try {
return cache.get(manifestKey).get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (final ExecutionException e) {
// Unwrap previously wrapped exceptions if possible.
final Throwable cause = e.getCause();

// We don't really expect this case, but handle it nevertheless.
if (cause == null) {
throw new RuntimeException(e);
}
if (e.getCause() instanceof StorageBackendException) {
throw (StorageBackendException) e.getCause();
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}

throw new RuntimeException(e);
} catch (final InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}

// for testing
RemovalListener<ObjectKey, SegmentManifest> removalListener() {
return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache."
+ " The reason of the deletion is {}", key, cause);
}

private static Weigher<ObjectKey, SegmentManifest> weigher() {
return (key, value) -> 1;
}

protected AsyncLoadingCache<ObjectKey, SegmentManifest> buildCache(final CacheConfig config) {
final ExecutorService executor = new ForkJoinPool();
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
final var cache = cacheBuilder.evictionListener(removalListener())
.scheduler(Scheduler.systemScheduler())
.executor(executor)
.recordStats(() -> statsCounter)
.buildAsync(key -> {
try (final InputStream is = fileFetcher.fetch(key)) {
return mapper.readValue(is, SegmentManifest.class);
}
});
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
return cache;
}

@Override
public void configure(final Map<String, ?> configs) {
final var config = CacheConfig.newBuilder(configs)
.withDefaultSize(DEFAULT_MAX_SIZE)
.withDefaultRetentionMs(DEFAULT_RETENTION_MS)
.build();
this.cache = buildCache(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.fetch.manifest;

import java.io.IOException;

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface SegmentManifestCache extends Configurable {
SegmentManifest get(final ObjectKey manifestKey)
throws StorageBackendException, IOException;
}
Loading
Loading