Skip to content

Commit

Permalink
More review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
aweisberg committed Jul 5, 2024
1 parent 9aa647d commit 17b6f85
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
13 changes: 7 additions & 6 deletions src/java/org/apache/cassandra/service/accord/AccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public IVerbHandler<? extends Request> verbHandler()
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof Timeout)
{
metrics.timeouts.mark();
Expand Down Expand Up @@ -653,6 +653,7 @@ public TopologyManager topology()
AsyncResult<Result> asyncResult = node.coordinate(txnId, txn);
AsyncPromise<TxnResult> resultFuture = new AsyncPromise<>();
asyncResult.addCallback((success, failure) -> {
Throwable cause = Throwables.getRootCause(failure);
long durationNanos = nanoTime() - queryStartNanos;
metrics.addNano(durationNanos);
if (success != null)
Expand All @@ -661,13 +662,13 @@ public TopologyManager topology()
return;
}

if (failure instanceof Timeout)
if (cause instanceof Timeout)
{
metrics.timeouts.mark();
resultFuture.tryFailure(newTimeout(txnId, txn.isWrite(), consistencyLevel));
return;
}
if (failure instanceof Preempted)
if (cause instanceof Preempted)
{
metrics.preempted.mark();
//TODO need to improve
Expand All @@ -676,14 +677,14 @@ public TopologyManager topology()
resultFuture.tryFailure(newPreempted(txnId, txn.isWrite(), consistencyLevel));
return;
}
if (failure instanceof TopologyMismatch)
if (cause instanceof TopologyMismatch)
{
metrics.topologyMismatches.mark();
resultFuture.tryFailure(RequestValidations.invalidRequest(failure.getMessage()));
resultFuture.tryFailure(RequestValidations.invalidRequest(cause.getMessage()));
return;
}
metrics.failures.mark();
resultFuture.tryFailure(new RuntimeException(failure));
resultFuture.tryFailure(new RuntimeException(cause));
});
return Pair.create(txnId, resultFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void maybePerformAccordToPaxosKeyMigration(boolean isForWrite)

// TODO (desired): Better query start time
TableMigrationState tms = tableMigrationState;
repairKeyAccord(key, tms.keyspaceName, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), nanoTime(), false, isForWrite);
repairKeyAccord(key, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), nanoTime(), false, isForWrite);
}

private boolean paxosReadSatisfiedByKeyMigration()
Expand Down Expand Up @@ -263,7 +264,6 @@ public static KeyMigrationState getKeyMigrationState(TableId tableId, DecoratedK
* Trigger a distributed repair of Accord state for this key.
*/
static void repairKeyAccord(DecoratedKey key,
String keyspace,
TableId tableId,
long minEpoch,
long queryStartNanos,
Expand All @@ -282,7 +282,9 @@ static void repairKeyAccord(DecoratedKey key,
// will soon be ready to execute, but only waits for the local replica to be ready
// Local will only create a transaction if it can't find an existing one to wait on
BarrierType barrierType = global ? BarrierType.global_async : BarrierType.local;
AccordService.instance().barrier(Seekables.of(new PartitionKey(tableId, key)), minEpoch, queryStartNanos, DatabaseDescriptor.getTransactionTimeout(TimeUnit.NANOSECONDS), barrierType, isForWrite);
Seekables keysOrRanges = AccordService.instance().barrier(Seekables.of(new PartitionKey(tableId, key)), minEpoch, queryStartNanos, DatabaseDescriptor.getTransactionTimeout(TimeUnit.NANOSECONDS), barrierType, isForWrite);
if (keysOrRanges.isEmpty())
throw new RetryOnDifferentSystemException();
// We don't save the state to the cache here. Accord will notify the agent every time a barrier happens.
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.cassandra.service.consensus.migration;

import java.util.Optional;
import javax.annotation.Nonnull;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
Expand All @@ -37,9 +41,6 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.utils.FBUtilities;

import javax.annotation.Nonnull;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState.getConsensusMigratedAt;
import static org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget.paxos;
Expand Down Expand Up @@ -216,7 +217,7 @@ private static ConsensusRoutingDecision pickBasedOnKeyMigrationStatus(ClusterMet
// barrier transactions to accomplish the migration
// They still might need to go through the fast local path for barrier txns
// at each replica, but they won't create their own txn since we created it here
ConsensusKeyMigrationState.repairKeyAccord(key, tms.keyspaceName, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), queryStartNanos, true, isForWrite);
ConsensusKeyMigrationState.repairKeyAccord(key, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), queryStartNanos, true, isForWrite);
return paxosV2;
}
// Fall through for repairKeyPaxos
Expand Down

0 comments on commit 17b6f85

Please sign in to comment.