Skip to content

Commit

Permalink
Add the ability to have the indexer start from head (#656)
Browse files Browse the repository at this point in the history
* Add the ability to have the indexer start from head

* Linting

* Reformat using Google Java styling

* Reformatted

* Feedback

* Formatting

* Fixed tests
  • Loading branch information
kyle-sammons authored Sep 11, 2023
1 parent 360934a commit b4251df
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 77 deletions.
2 changes: 2 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ indexerConfig:
dataDirectory: ${INDEXER_DATA_DIR:-/tmp}
maxOffsetDelayMessages: ${INDEXER_MAX_OFFSET_DELAY_MESSAGES:-10000000}
defaultQueryTimeoutMs: ${KALDB_INDEX_DEFAULT_QUERY_TIMEOUT_MS:-2500}
readFromLocationOnStart: ${INDEXER_READ_FROM_LOCATION_ON_START:-LATEST}
createRecoveryTasksOnStart: ${INDEXER_CREATE_RECOVERY_TASKS_ON_START:-true}
serverConfig:
serverPort: ${KALDB_INDEX_SERVER_PORT:-8080}
serverAddress: ${KALDB_INDEX_SERVER_ADDRESS:-localhost}
Expand Down
9 changes: 7 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ protected void startUp() throws Exception {
long startOffset = indexerPreStart();
// ensure the chunk manager is available to receive messages
chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION);

// Set the Kafka offset and pre consumer for consumption.
kafkaConsumer.prepConsumerForConsumption(startOffset);

LOG.info("Started Kaldb indexer.");
}

Expand Down Expand Up @@ -109,8 +111,11 @@ private long indexerPreStart() throws Exception {
maxMessagesPerRecoveryTask,
meterRegistry);

long currentHeadOffsetForPartition = kafkaConsumer.getEndOffSetForPartition();
long startOffset = recoveryTaskCreator.determineStartingOffset(currentHeadOffsetForPartition);
long currentEndOffsetForPartition = kafkaConsumer.getEndOffSetForPartition();
long currentBeginningOffsetForPartition = kafkaConsumer.getBeginningOffsetForPartition();
long startOffset =
recoveryTaskCreator.determineStartingOffset(
currentEndOffsetForPartition, currentBeginningOffsetForPartition, indexerConfig);

// Close these stores since we don't need them after preStart.
snapshotMetadataStore.close();
Expand Down
62 changes: 51 additions & 11 deletions kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.slack.kaldb.metadata.recovery.RecoveryTaskMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadata;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.proto.config.KaldbConfigs;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Instant;
Expand Down Expand Up @@ -141,10 +142,19 @@ public List<SnapshotMetadata> deleteStaleLiveSnapshots(List<SnapshotMetadata> sn
* recovery task offsets are [startOffset, endOffset]. If a recovery task is created, we start
* indexing at the offset after the recovery task.
*
* <p>When there is no offset data for a partition, return -1. In that case, the consumer would
* have to start indexing the data from the earliest offset.
* <p>When there is no offset data for a partition, if indexer.readFromLocationOnStart is set to
* LATEST and indexer.createRecoveryTasksOnStart is set to "false", then simply return the latest
* offset and start reading from there. This would be useful in the case that you're spinning up a
* new cluster on existing data and don't care about data previously in the pipeline. If instead
* indexer.createRecoveryTasksOnStart is set to "true", then the latest position will still be
* returned but recovery tasks will be created to ingest from the beginning to the latest. If
* instead indexer.readFromLocationOnStart is set to EARLIEST, then return -1. In that case, the
* consumer would have to start indexing the data from the earliest offset.
*/
public long determineStartingOffset(long currentHeadOffsetForPartition) {
public long determineStartingOffset(
long currentEndOffsetForPartition,
long currentBeginningOffsetForPartition,
KaldbConfigs.IndexerConfig indexerConfig) {
// Filter stale snapshots for partition.
if (partitionId == null) {
LOG.warn("PartitionId can't be null.");
Expand Down Expand Up @@ -184,19 +194,49 @@ public long determineStartingOffset(long currentHeadOffsetForPartition) {

if (highestDurableOffsetForPartition <= 0) {
LOG.info("There is no prior offset for this partition {}.", partitionId);
return highestDurableOffsetForPartition;

// If the user wants to start at the current offset in Kafka and _does not_ want to create
// recovery tasks to backfill, then we can just return the current offset.
// If the user wants to start at the current offset in Kafka and _does_ want to create
// recovery tasks to backfill, then we create the recovery tasks needed and then return
// the current offset for the indexer. And if the user does _not_ want to start at the
// current offset in Kafka, then we'll just default to the old behavior of starting from
// the very beginning
if (!indexerConfig.getCreateRecoveryTasksOnStart()
&& indexerConfig.getReadFromLocationOnStart()
== KaldbConfigs.KafkaOffsetLocation.LATEST) {
LOG.info(
"CreateRecoveryTasksOnStart is set to false and ReadLocationOnStart is set to current. Reading from current and"
+ " NOT spinning up recovery tasks");
return currentEndOffsetForPartition;
} else if (indexerConfig.getCreateRecoveryTasksOnStart()
&& indexerConfig.getReadFromLocationOnStart()
== KaldbConfigs.KafkaOffsetLocation.LATEST) {
LOG.info(
"CreateRecoveryTasksOnStart is set and ReadLocationOnStart is set to current. Reading from current and"
+ " spinning up recovery tasks");
createRecoveryTasks(
partitionId,
currentBeginningOffsetForPartition,
currentEndOffsetForPartition,
indexerConfig.getMaxMessagesPerChunk());
return currentEndOffsetForPartition;

} else {
return highestDurableOffsetForPartition;
}
}

// The current head offset shouldn't be lower than the highest durable offset. If it is it
// means that we indexed more data than the current head offset. This is either a bug in the
// offset handling mechanism or the kafka partition has rolled over. We throw an exception
// for now, so we can investigate.
if (currentHeadOffsetForPartition < highestDurableOffsetForPartition) {
if (currentEndOffsetForPartition < highestDurableOffsetForPartition) {
final String message =
String.format(
"The current head for the partition %d can't "
+ "be lower than the highest durable offset for that partition %d",
currentHeadOffsetForPartition, highestDurableOffsetForPartition);
currentEndOffsetForPartition, highestDurableOffsetForPartition);
LOG.error(message);
throw new IllegalStateException(message);
}
Expand All @@ -208,25 +248,25 @@ public long determineStartingOffset(long currentHeadOffsetForPartition) {
long nextOffsetForPartition = highestDurableOffsetForPartition + 1;

// Create a recovery task if needed.
if (currentHeadOffsetForPartition - highestDurableOffsetForPartition > maxOffsetDelay) {
if (currentEndOffsetForPartition - highestDurableOffsetForPartition > maxOffsetDelay) {
LOG.info(
"Recovery task needed. The current position {} and head location {} are higher than max"
+ " offset {}",
highestDurableOffsetForPartition,
currentHeadOffsetForPartition,
currentEndOffsetForPartition,
maxOffsetDelay);
createRecoveryTasks(
partitionId,
nextOffsetForPartition,
currentHeadOffsetForPartition - 1,
currentEndOffsetForPartition - 1,
maxMessagesPerRecoveryTask);
return currentHeadOffsetForPartition;
return currentEndOffsetForPartition;
} else {
LOG.info(
"The difference between the last indexed position {} and head location {} is lower "
+ "than max offset {}. So, using {} position as the start offset",
highestDurableOffsetForPartition,
currentHeadOffsetForPartition,
currentEndOffsetForPartition,
maxOffsetDelay,
nextOffsetForPartition);
return nextOffsetForPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class KaldbKafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KaldbKafkaConsumer.class);
public static final int KAFKA_POLL_TIMEOUT_MS = 250;
private final LogMessageWriterImpl logMessageWriterImpl;

private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG};
private static final Set<String> OVERRIDABLE_CONFIGS =
Set.of(
Expand Down Expand Up @@ -177,7 +176,7 @@ public void prepConsumerForConsumption(long startOffset) {
kafkaConsumer.assign(Collections.singletonList(topicPartition));
LOG.info("Assigned to topicPartition: {}", topicPartition);
// Offset is negative when the partition was not consumed before, so start consumption from
// beginning of the stream. If the offset is positive, start consuming from there.
// there
if (startOffset > 0) {
kafkaConsumer.seek(topicPartition, startOffset);
} else {
Expand All @@ -192,6 +191,16 @@ public void close() {
LOG.info("Closed kafka consumer for partition:{}", topicPartition);
}

public long getBeginningOffsetForPartition() {
return getBeginningOffsetForPartition(topicPartition);
}

public long getBeginningOffsetForPartition(TopicPartition topicPartition) {
return kafkaConsumer
.beginningOffsets(Collections.singletonList(topicPartition))
.get(topicPartition);
}

public long getEndOffSetForPartition() {
return getEndOffSetForPartition(topicPartition);
}
Expand Down
12 changes: 12 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ message QueryServiceConfig {
string managerConnectString = 3;
}

enum KafkaOffsetLocation {
EARLIEST = 0;
LATEST = 1;
}

// Configuration for the indexer.
message IndexerConfig {
// Chunk config
Expand All @@ -108,6 +113,13 @@ message IndexerConfig {
int32 default_query_timeout_ms = 9;

KafkaConfig kafka_config = 10;

// Determining where to read from the location when we start up
KafkaOffsetLocation read_from_location_on_start = 11;

// Whether or not to create recovery tasks when the indexer boots up and
// is behind.
bool create_recovery_tasks_on_start = 12;
}

// A config object containing all the lucene configs.
Expand Down
Loading

0 comments on commit b4251df

Please sign in to comment.