Skip to content

Commit

Permalink
More review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
aweisberg committed Jul 3, 2024
1 parent 9aa647d commit a4fcbd8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
13 changes: 7 additions & 6 deletions src/java/org/apache/cassandra/service/accord/AccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public IVerbHandler<? extends Request> verbHandler()
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof Timeout)
{
metrics.timeouts.mark();
Expand Down Expand Up @@ -653,6 +653,7 @@ public TopologyManager topology()
AsyncResult<Result> asyncResult = node.coordinate(txnId, txn);
AsyncPromise<TxnResult> resultFuture = new AsyncPromise<>();
asyncResult.addCallback((success, failure) -> {
Throwable cause = Throwables.getRootCause(failure);
long durationNanos = nanoTime() - queryStartNanos;
metrics.addNano(durationNanos);
if (success != null)
Expand All @@ -661,13 +662,13 @@ public TopologyManager topology()
return;
}

if (failure instanceof Timeout)
if (cause instanceof Timeout)
{
metrics.timeouts.mark();
resultFuture.tryFailure(newTimeout(txnId, txn.isWrite(), consistencyLevel));
return;
}
if (failure instanceof Preempted)
if (cause instanceof Preempted)
{
metrics.preempted.mark();
//TODO need to improve
Expand All @@ -676,14 +677,14 @@ public TopologyManager topology()
resultFuture.tryFailure(newPreempted(txnId, txn.isWrite(), consistencyLevel));
return;
}
if (failure instanceof TopologyMismatch)
if (cause instanceof TopologyMismatch)
{
metrics.topologyMismatches.mark();
resultFuture.tryFailure(RequestValidations.invalidRequest(failure.getMessage()));
resultFuture.tryFailure(RequestValidations.invalidRequest(cause.getMessage()));
return;
}
metrics.failures.mark();
resultFuture.tryFailure(new RuntimeException(failure));
resultFuture.tryFailure(new RuntimeException(cause));
});
return Pair.create(txnId, resultFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void maybePerformAccordToPaxosKeyMigration(boolean isForWrite)

// TODO (desired): Better query start time
TableMigrationState tms = tableMigrationState;
repairKeyAccord(key, tms.keyspaceName, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), nanoTime(), false, isForWrite);
repairKeyAccord(key, tms.tableId, tms.minMigrationEpoch(key.getToken()).getEpoch(), nanoTime(), false, isForWrite);
}

private boolean paxosReadSatisfiedByKeyMigration()
Expand Down Expand Up @@ -263,7 +264,6 @@ public static KeyMigrationState getKeyMigrationState(TableId tableId, DecoratedK
* Trigger a distributed repair of Accord state for this key.
*/
static void repairKeyAccord(DecoratedKey key,
String keyspace,
TableId tableId,
long minEpoch,
long queryStartNanos,
Expand All @@ -282,7 +282,9 @@ static void repairKeyAccord(DecoratedKey key,
// will soon be ready to execute, but only waits for the local replica to be ready
// Local will only create a transaction if it can't find an existing one to wait on
BarrierType barrierType = global ? BarrierType.global_async : BarrierType.local;
AccordService.instance().barrier(Seekables.of(new PartitionKey(tableId, key)), minEpoch, queryStartNanos, DatabaseDescriptor.getTransactionTimeout(TimeUnit.NANOSECONDS), barrierType, isForWrite);
Seekables keysOrRanges = AccordService.instance().barrier(Seekables.of(new PartitionKey(tableId, key)), minEpoch, queryStartNanos, DatabaseDescriptor.getTransactionTimeout(TimeUnit.NANOSECONDS), barrierType, isForWrite);
if (keysOrRanges.isEmpty())
throw new RetryOnDifferentSystemException();
// We don't save the state to the cache here. Accord will notify the agent every time a barrier happens.
}
finally
Expand Down

0 comments on commit a4fcbd8

Please sign in to comment.