Skip to content

Commit

Permalink
Migrating away from accord test routing failure as an actual routing …
Browse files Browse the repository at this point in the history
…failure, various doc and bug fixes:
  • Loading branch information
aweisberg committed Jul 9, 2024
1 parent 0d9b748 commit 7245fc8
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 46 deletions.
7 changes: 5 additions & 2 deletions src/java/org/apache/cassandra/batchlog/BatchlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import accord.primitives.TxnId;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.UntypedResultSet.Row;
Expand Down Expand Up @@ -103,7 +104,6 @@
public class BatchlogManager implements BatchlogManagerMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
static final int DEFAULT_PAGE_SIZE = 128;

private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
Expand Down Expand Up @@ -131,7 +131,7 @@ public void start()

batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
StorageService.RING_DELAY_MILLIS,
REPLAY_INTERVAL,
CassandraRelevantProperties.BATCHLOG_REPLAY_INTERVAL_MS.getLong(),
MILLISECONDS);
}

Expand Down Expand Up @@ -200,7 +200,9 @@ public long getTotalBatchesReplayed()

public void forceBatchlogReplay() throws Exception
{
logger.debug("Forcing batchlog replay");
startBatchlogReplay().get();
logger.debug("Finished forcing batchlog replay");
}

public Future<?> startBatchlogReplay()
Expand Down Expand Up @@ -482,6 +484,7 @@ public void finish(Set<UUID> hintedNodes)
}
catch (Exception e)
{
logger.debug("Unexpected batchlog replay exception", e);
failure = Throwables.merge(failure, e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum CassandraRelevantProperties
AUTOCOMPACTION_ON_STARTUP_ENABLED("cassandra.autocompaction_on_startup_enabled", "true"),
AUTO_BOOTSTRAP("cassandra.auto_bootstrap"),
AUTO_REPAIR_FREQUENCY_SECONDS("cassandra.auto_repair_frequency_seconds", convertToString(TimeUnit.MINUTES.toSeconds(5))),
BATCHLOG_REPLAY_INTERVAL_MS("cassandra.batchlog.replay_interval_ms", "10000"),
BATCHLOG_REPLAY_TIMEOUT_IN_MS("cassandra.batchlog.replay_timeout_in_ms"),
BATCH_COMMIT_LOG_SYNC_INTERVAL("cassandra.batch_commitlog_sync_interval_millis", "1000"),
/**
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/hints/HintsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
Expand Down Expand Up @@ -478,7 +479,7 @@ public void run()
}
catch (Exception e)
{
accordOutcome = FAILURE;
accordOutcome = e instanceof WriteTimeoutException ? TIMEOUT : FAILURE;
String msg = "Accord hint delivery transaction failed";
if (noSpamLogger.getStatement(msg).shouldLog(Clock.Global.nanoTime()))
logger.error(msg, e);
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/hints/HintsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void pauseDispatch()

public void resumeDispatch()
{
logger.info("Resumed hints dispatch");
logger.debug("Resumed hints dispatch");
isDispatchPaused.set(false);

HintsServiceDiagnostics.dispatchingResumed(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public long preAcceptTimeout()
@Override
public Txn emptyTxn(Kind kind, Seekables<?, ?> seekables)
{
return new Txn.InMemory(kind, seekables, TxnRead.EMPTY, TxnQuery.EMPTY, null);
return new Txn.InMemory(kind, seekables, TxnRead.EMPTY, TxnQuery.UNSAFE_EMPTY, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import accord.messages.ReadTxnData;
import accord.primitives.Ballot;
import org.apache.cassandra.schema.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,6 +40,8 @@
import accord.local.Node.Id;
import accord.messages.Commit;
import accord.messages.Commit.Kind;
import accord.messages.ReadTxnData;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Participants;
Expand Down Expand Up @@ -74,6 +73,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.accord.AccordEndpointMapper;
import org.apache.cassandra.service.accord.TokenRange;
Expand Down Expand Up @@ -260,8 +260,8 @@ private AsyncChain<Data> readChains()

// This should only rarely occur when coordinators start a transaction in a migrating range
// because they haven't yet updated their cluster metadata.
// It would be harmless to do the read, but we can respond faster skipping it
// and getting the transaction on the correct protocol
// It would be harmless to do the read, because it will be rejected in `TxnQuery` anyways,
// but it's faster to skip the read
TableMigrationState tms = ConsensusTableMigration.getTableMigrationState(command.metadata().id);
AccordClientRequestMetrics metrics = txn.kind().isWrite() ? accordWriteMetrics : accordReadMetrics;
if (ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeFromAccord(command.metadata(), tms, command.partitionKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.Nonnull;

import accord.coordinate.Timeout;
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.ReadData.ReadOk;
Expand Down Expand Up @@ -78,7 +79,12 @@ else if (reply == Insufficient)

public void onFailure(Node.Id from, Throwable failure)
{
wrapped.onFailure(endpoint, RequestFailure.UNKNOWN);
RequestFailure requestFailure = RequestFailure.UNKNOWN;
// Convert from Accord's timeout exception to our failure reason because timeout is something
// that is useful for metrics and can be handled differently
if (failure instanceof Timeout)
requestFailure = RequestFailure.TIMEOUT;
wrapped.onFailure(endpoint, requestFailure);
}

public void onCallbackFailure(Node.Id from, Throwable failure)
Expand Down
26 changes: 22 additions & 4 deletions src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@

public abstract class TxnQuery implements Query
{
/**
* Used by transaction statements which will have Accord pass back to the C* coordinator code all the data that is
* read even if it is not returned as part of the result to the client. TxnDataName.returning() will fetch the data
* that is returned from TxnData.
*/
public static final TxnQuery ALL = new TxnQuery()
{
@Override
Expand All @@ -65,6 +70,10 @@ public Result doCompute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys,
}
};

/**
* For transactions that return no results but do still care that they don't apply if the tokens/ranges
* are not owned/managed by Accord
*/
public static final TxnQuery NONE = new TxnQuery()
{
@Override
Expand All @@ -80,6 +89,9 @@ public Result doCompute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys,
}
};

/**
* For supporting CQL CAS compatible transactions
*/
public static final TxnQuery CONDITION = new TxnQuery()
{
@Override
Expand Down Expand Up @@ -114,7 +126,13 @@ else if (txnData.isEmpty())
}
};

public static final TxnQuery EMPTY = new TxnQuery()
/**
* UNSAFE_EMPTY doesn't validate that the range is owned by Accord so you want to be careful and use NONE
* if your transaction simply doesn't have results because that will validate that Accord owns the range
* for things like blind writes. Empty is used by Accord for things like sync points which it may want to do
* for ranges it no longer owns.
*/
public static final TxnQuery UNSAFE_EMPTY = new TxnQuery()
{

@Override
Expand Down Expand Up @@ -171,7 +189,7 @@ public long estimatedSizeOnHeap()
@Override
public void serialize(TxnQuery query, DataOutputPlus out, int version) throws IOException
{
Preconditions.checkArgument(query == null | query == ALL | query == NONE | query == CONDITION | query == EMPTY);
Preconditions.checkArgument(query == null | query == ALL | query == NONE | query == CONDITION | query == UNSAFE_EMPTY);
out.writeByte(query == null ? 0 : query.type());
}

Expand All @@ -185,14 +203,14 @@ public TxnQuery deserialize(DataInputPlus in, int version) throws IOException
case 1: return ALL;
case 2: return NONE;
case 3: return CONDITION;
case 4: return EMPTY;
case 4: return UNSAFE_EMPTY;
}
}

@Override
public long serializedSize(TxnQuery query, int version)
{
Preconditions.checkArgument(query == null | query == ALL | query == NONE | query == CONDITION | query == EMPTY);
Preconditions.checkArgument(query == null | query == ALL | query == NONE | query == CONDITION | query == UNSAFE_EMPTY);
return TypeSizes.sizeof((byte)2);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public ConsistencyLevel commitCLForStrategy(ConsistencyLevel consistencyLevel)
return consistencyLevel;
}

// TODO (required): This won't work for migration directly from none to full because there is no safe system to read from
// during the first phase (repair). Accord won't read correctly beacuse it won't honor the CL and miss non-transactional writes that haven't been repaired and non-transactional
// reads will miss all the writes being routed through Accord since they occur asynchronously. Something has to give here where either writes routed through are Accord are synchronous at CL
// or reads are routed through Accord and read at quorum as long as the range has not completed the first phase (repair).
public ConsistencyLevel readCLForStrategy(ConsistencyLevel consistencyLevel)
{
if (ignoresSuppliedConsistencyLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public static Pair<TxnId, Future<TxnResult>> mutateWithAccordAsync(Collection<?
// Potentially ignore commit consistency level if the strategy specifies accord and not migration
ConsistencyLevel clForCommit = consistencyLevelForCommit(mutations, consistencyLevel);
TxnUpdate update = new TxnUpdate(fragments, TxnCondition.none(), clForCommit, true);
Txn.InMemory txn = new Txn.InMemory(Keys.of(partitionKeys), TxnRead.EMPTY, TxnQuery.EMPTY, update);
Txn.InMemory txn = new Txn.InMemory(Keys.of(partitionKeys), TxnRead.EMPTY, TxnQuery.NONE, update);
IAccordService accordService = AccordService.instance();
return accordService.coordinateAsync(txn, consistencyLevel, queryStartNanoTime);
}
Expand Down Expand Up @@ -372,12 +372,27 @@ public static boolean tokenShouldBeWrittenThroughAccord(@Nonnull ClusterMetadata
return migrationFromWritesThroughAccord;
}

// Anything migrating or migrated should be running on Accord
// This logic is driven by the fact that Paxos is not picky about how data is written since it's txn recovery
// is deterministic in the face of non-deterministic reads because consensus is agreeing on the writes that will be done to the database
// Accord agrees on what computation will produce those writes and then asynchronously executes those computations, potentially multiple times
// with different results if Accord reads non-transactionally written data that could be seen differently by different coordinators

// If the current mode writes through Accord then we should always write though Accord for ranges managed by Accord.
// Accord needs to do synchronous commit and respect the consistency level so that Accord will later be able to
// read its own writes
if (transactionalModeWritesThroughAccord)
return isInNormalizedRanges(token, tms.migratingAndMigratedRanges);
// Migrated ranges are on Paxos because if current mode and migration from are on Accord we exit early at the start

// If we are migrating from a mode that used to write to Accord then any range that isn't migrating/migrated
// should continue to write through Accord.
// It's not completely symmetrical because Paxos is able to read Accord's writes by performing a single key barrier
// and regular mutations will be able to do the same thing (needs to be added along with non-transactional reads)
// This means that migrating ranges don't need to be written through Accord because we are running Paxos now
// and not Accord. When migrating to Accord we need to do all the writes through Accord even if we aren't
// reading through Accord so that repair + Accord metadata is sufficient for Accord to be able to read
// safely and deterministically from any coordinator
if (migrationFromWritesThroughAccord)
return isInNormalizedRanges(token, tms.migratingRanges);
return !isInNormalizedRanges(token, tms.migratingAndMigratedRanges);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ protected static Callable<Void> pauseBeforeEnacting(IInvokableInstance instance,
{
logger.info("EpochPause Waiting before enacting epoch {}", epoch);
promise.get(wait, waitUnit);
logger.info("EpochPause stopped waiting before enacting epoch {}", epoch);
return null;
}
catch (Throwable e)
Expand Down Expand Up @@ -484,6 +485,7 @@ protected static Callable<Void> pauseAfterEnacting(IInvokableInstance instance,
{
logger.info("EpochPause Waiting after enacting epoch {}", epoch);
promise.get(wait, waitUnit);
logger.info("EpochPause done waiting after enacting epoch {}", epoch);
return null;
}
catch (Throwable e)
Expand Down
Loading

0 comments on commit 7245fc8

Please sign in to comment.