Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Commit

Permalink
start cache from specific index
Browse files Browse the repository at this point in the history
  • Loading branch information
dzharikhin committed Aug 20, 2021
1 parent cb738bf commit 8ccb900
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 77 deletions.
107 changes: 52 additions & 55 deletions src/main/java/com/orbitz/consul/cache/ConsulCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -49,7 +52,7 @@ enum State {latent, starting, started, stopped }

private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);

private final AtomicReference<BigInteger> latestIndex = new AtomicReference<>(null);
private final AtomicReference<BigInteger> latestIndex;
private final AtomicLong lastContact = new AtomicLong();
private final AtomicBoolean isKnownLeader = new AtomicBoolean();
private final AtomicReference<ConsulResponse.CacheResponseInfo> lastCacheInfo = new AtomicReference<>(null);
Expand All @@ -72,9 +75,10 @@ protected ConsulCache(
CallbackConsumer<V> callbackConsumer,
CacheConfig cacheConfig,
ClientEventHandler eventHandler,
CacheDescriptor cacheDescriptor) {
CacheDescriptor cacheDescriptor,
@Nullable BigInteger initialIndex) {

this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, createDefault());
this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, createDefault(), initialIndex);
}

protected ConsulCache(
Expand All @@ -83,9 +87,12 @@ protected ConsulCache(
CacheConfig cacheConfig,
ClientEventHandler eventHandler,
CacheDescriptor cacheDescriptor,
ScheduledExecutorService callbackScheduleExecutorService) {
ScheduledExecutorService callbackScheduleExecutorService,
@Nullable BigInteger initialIndex) {

this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, new ExternalScheduler(callbackScheduleExecutorService));
this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor,
new ExternalScheduler(callbackScheduleExecutorService), initialIndex
);
}

protected ConsulCache(
Expand All @@ -94,20 +101,14 @@ protected ConsulCache(
CacheConfig cacheConfig,
ClientEventHandler eventHandler,
CacheDescriptor cacheDescriptor,
Scheduler callbackScheduler) {
if (keyConversion == null) {
Validate.notNull(keyConversion, "keyConversion must not be null");
}
if (callbackConsumer == null) {
Validate.notNull(callbackConsumer, "callbackConsumer must not be null");
}
if (cacheConfig == null) {
Validate.notNull(cacheConfig, "cacheConfig must not be null");
}
if (eventHandler == null) {
Validate.notNull(eventHandler, "eventHandler must not be null");
}

Scheduler callbackScheduler,
@Nullable BigInteger initialIndex) {
Validate.notNull(keyConversion, "keyConversion must not be null");
Validate.notNull(callbackConsumer, "callbackConsumer must not be null");
Validate.notNull(cacheConfig, "cacheConfig must not be null");
Validate.notNull(eventHandler, "eventHandler must not be null");

latestIndex = new AtomicReference<>(initialIndex);
this.keyConversion = keyConversion;
this.callBackConsumer = callbackConsumer;
this.eventHandler = eventHandler;
Expand Down Expand Up @@ -138,28 +139,17 @@ public void onComplete(ConsulResponse<List<V>> consulResponse) {
// metadata changes
lastContact.set(consulResponse.getLastContact());
isKnownLeader.set(consulResponse.isKnownLeader());
}

if (changed) {
Boolean locked = false;
if (state.get() == State.starting) {
listenersStartingLock.lock();
locked = true;
}
try {
withStartingLock(() -> {
for (Listener<K, V> l : listeners) {
try {
l.notify(full);
} catch (RuntimeException e) {
LOGGER.warn("ConsulCache Listener's notify method threw an exception.", e);
}
}
}
finally {
if (locked) {
listenersStartingLock.unlock();
}
}
return null;
});
}

if (state.compareAndSet(State.starting, State.started)) {
Expand All @@ -173,8 +163,7 @@ public void onComplete(ConsulResponse<List<V>> consulResponse) {
}
timeToWait = timeToWait.minusMillis(elapsedTime);

scheduler.schedule(ConsulCache.this::runCallback,
timeToWait.toMillis(), TimeUnit.MILLISECONDS);
scheduler.schedule(ConsulCache.this::runCallback, timeToWait.toMillis(), TimeUnit.MILLISECONDS);

} else {
onFailure(new ConsulException("Consul cluster has no elected leader"));
Expand All @@ -198,6 +187,21 @@ public void onFailure(Throwable throwable) {
};
}

private <T> T withStartingLock(Supplier<T> action) {
boolean wasInStartingState = state.get() == State.starting;
if (wasInStartingState) {
listenersStartingLock.lock();
}
try {
return action.get();
}
finally {
if (wasInStartingState) {
listenersStartingLock.unlock();
}
}
}

static long computeBackOffDelayMs(CacheConfig cacheConfig) {
return cacheConfig.getMinimumBackOffDelay().toMillis() +
Math.round(Math.random() * (cacheConfig.getMaximumBackOffDelay().minus(cacheConfig.getMinimumBackOffDelay()).toMillis()));
Expand Down Expand Up @@ -238,7 +242,8 @@ private void runCallback() {
}

private boolean isRunning() {
return state.get() == State.started || state.get() == State.starting;
State currentState = this.state.get();
return currentState == State.started || currentState == State.starting;
}

public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
Expand Down Expand Up @@ -276,7 +281,8 @@ ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {

private void updateIndex(ConsulResponse<List<V>> consulResponse) {
if (consulResponse != null && consulResponse.getIndex() != null) {
this.latestIndex.set(consulResponse.getIndex());
BigInteger previousIndex = this.latestIndex.getAndSet(consulResponse.getIndex());
LOGGER.trace("Updated cache index from {} to {}", previousIndex, latestIndex.get());
}
}

Expand Down Expand Up @@ -318,6 +324,7 @@ protected static Scheduler createExternal(ScheduledExecutorService executor) {
*
* @param <V>
*/
@FunctionalInterface
protected interface CallbackConsumer<V> {
void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
}
Expand All @@ -328,33 +335,23 @@ protected interface CallbackConsumer<V> {
*
* @param <V>
*/
@FunctionalInterface
public interface Listener<K, V> {
void notify(Map<K, V> newValues);
}

public boolean addListener(Listener<K, V> listener) {
Boolean locked = false;
boolean added;
if (state.get() == State.starting) {
listenersStartingLock.lock();
locked = true;
}
try {
added = listeners.add(listener);
if (state.get() == State.started) {
return withStartingLock(() -> {
boolean added = listeners.add(listener);
if (this.state.get() == State.started) {
try {
listener.notify(lastResponse.get());
} catch (RuntimeException e) {
LOGGER.warn("ConsulCache Listener's notify method threw an exception.", e);
}
}
}
finally {
if (locked) {
listenersStartingLock.unlock();
}
}
return added;
return added;
});
}

public List<Listener<K, V>> getListeners() {
Expand Down Expand Up @@ -396,9 +393,9 @@ public DefaultScheduler() {
}
}

private static class ExternalScheduler extends Scheduler {
private static final class ExternalScheduler extends Scheduler {

public ExternalScheduler(ScheduledExecutorService executor) {
private ExternalScheduler(ScheduledExecutorService executor) {
super(executor);
}

Expand Down
34 changes: 30 additions & 4 deletions src/main/java/com/orbitz/consul/cache/HealthCheckCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.orbitz.consul.model.health.HealthCheck;
import com.orbitz.consul.option.QueryOptions;

import java.math.BigInteger;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

Expand All @@ -16,7 +17,8 @@ private HealthCheckCache(HealthClient healthClient,
int watchSeconds,
QueryOptions queryOptions,
Function<HealthCheck, String> keyExtractor,
Scheduler callbackScheduler) {
Scheduler callbackScheduler,
BigInteger initialIndex) {
super(keyExtractor,
(index, callback) -> {
checkWatch(healthClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds);
Expand All @@ -26,7 +28,8 @@ private HealthCheckCache(HealthClient healthClient,
healthClient.getConfig().getCacheConfig(),
healthClient.getEventHandler(),
new CacheDescriptor("health.state", state.getName()),
callbackScheduler);
callbackScheduler,
initialIndex);
}

/**
Expand All @@ -47,7 +50,20 @@ public static HealthCheckCache newCache(
final ScheduledExecutorService callbackExecutorService) {

Scheduler callbackScheduler = createExternal(callbackExecutorService);
return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler);
return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler, null);
}

public static HealthCheckCache newCache(
HealthClient healthClient,
com.orbitz.consul.model.State state,
int watchSeconds,
QueryOptions queryOptions,
BigInteger initialIndex,
Function<HealthCheck, String> keyExtractor,
ScheduledExecutorService callbackExecutorService) {

Scheduler callbackScheduler = createExternal(callbackExecutorService);
return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler, initialIndex);
}

public static HealthCheckCache newCache(
Expand All @@ -57,7 +73,7 @@ public static HealthCheckCache newCache(
final QueryOptions queryOptions,
final Function<HealthCheck, String> keyExtractor) {

return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, createDefault());
return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, createDefault(), null);
}
public static HealthCheckCache newCache(
final HealthClient healthClient,
Expand All @@ -68,6 +84,15 @@ public static HealthCheckCache newCache(
return newCache(healthClient, state, watchSeconds, queryOptions, HealthCheck::getCheckId);
}

public static HealthCheckCache newCache(
HealthClient healthClient,
com.orbitz.consul.model.State state,
int watchSeconds,
QueryOptions queryOptions, BigInteger initialIndex) {

return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, HealthCheck::getCheckId, createDefault(), initialIndex);
}

public static HealthCheckCache newCache(
final HealthClient healthClient,
final com.orbitz.consul.model.State state,
Expand All @@ -76,6 +101,7 @@ public static HealthCheckCache newCache(
return newCache(healthClient, state, watchSeconds, QueryOptions.BLANK);
}

@Deprecated
public static HealthCheckCache newCache(final HealthClient healthClient, final com.orbitz.consul.model.State state) {
CacheConfig cacheConfig = healthClient.getConfig().getCacheConfig();
int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds());
Expand Down
34 changes: 30 additions & 4 deletions src/main/java/com/orbitz/consul/cache/KVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.option.QueryOptions;

import java.math.BigInteger;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

Expand All @@ -18,7 +19,8 @@ private KVCache(KeyValueClient kvClient,
String keyPath,
int watchSeconds,
QueryOptions queryOptions,
Scheduler callbackScheduler) {
Scheduler callbackScheduler,
BigInteger initialIndex) {
super(getKeyExtractorFunction(keyPath),
(index, callback) -> {
checkWatch(kvClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds);
Expand All @@ -28,7 +30,8 @@ private KVCache(KeyValueClient kvClient,
kvClient.getConfig().getCacheConfig(),
kvClient.getEventHandler(),
new CacheDescriptor("keyvalue", rootPath),
callbackScheduler);
callbackScheduler,
initialIndex);
}

@VisibleForTesting
Expand Down Expand Up @@ -56,15 +59,36 @@ public static KVCache newCache(
final ScheduledExecutorService callbackExecutorService) {

Scheduler scheduler = createExternal(callbackExecutorService);
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler);
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler, null);
}

public static KVCache newCache(
KeyValueClient kvClient,
String rootPath,
int watchSeconds,
QueryOptions queryOptions,
BigInteger initialIndex,
ScheduledExecutorService callbackExecutorService) {

Scheduler scheduler = createExternal(callbackExecutorService);
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler, initialIndex);
}

public static KVCache newCache(
final KeyValueClient kvClient,
final String rootPath,
final int watchSeconds,
final QueryOptions queryOptions) {
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault());
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault(), null);
}

public static KVCache newCache(
KeyValueClient kvClient,
String rootPath,
int watchSeconds,
BigInteger initialIndex,
QueryOptions queryOptions) {
return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault(), initialIndex);
}

@VisibleForTesting
Expand All @@ -82,6 +106,7 @@ static String prepareRootPath(String rootPath) {
* to be increased as well)
* @return the cache object
*/
@Deprecated
public static KVCache newCache(
final KeyValueClient kvClient,
final String rootPath,
Expand All @@ -97,6 +122,7 @@ public static KVCache newCache(
* @param rootPath the root path
* @return the cache object
*/
@Deprecated
public static KVCache newCache(final KeyValueClient kvClient, final String rootPath) {
CacheConfig cacheConfig = kvClient.getConfig().getCacheConfig();
int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds());
Expand Down
Loading

0 comments on commit 8ccb900

Please sign in to comment.