Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22218 Remove TableRaftService in favor of using RaftGroupService from Replica instances #3973

Merged
merged 92 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
b29aad4
IGNITE-22315 Init
JAkutenshi Jun 19, 2024
27e401a
IGNITE-22315 BuildIndexTest fix + checkstyle fix
JAkutenshi Jun 19, 2024
e467d39
IGNITE-22315 start replica with raft chaining
JAkutenshi Jun 19, 2024
9b84bb6
IGNITE-22315 solePartitionAssignmentsContain fix
JAkutenshi Jun 20, 2024
59a8291
IGNITE-22315 TableManager start replica chaining refactoring
JAkutenshi Jun 21, 2024
23ec8b6
IGNITE-22315 pmd fix
JAkutenshi Jun 21, 2024
9343f15
Merge branch 'refs/heads/main' into ignite-22315-redone
JAkutenshi Jun 21, 2024
09fab58
IGNITE-22315 Main merge with Kirill's changes
JAkutenshi Jun 21, 2024
29e22d9
IGNITE-22315 Remove unintentional repeated tests
JAkutenshi Jun 21, 2024
4afcdfc
IGNITE-22315 inMemoryNodeRestartNotLeader return test and fix assert …
JAkutenshi Jun 21, 2024
bb97122
Merge branch 'refs/heads/main' into ignite-22315-redone
JAkutenshi Jun 23, 2024
5f1341a
IGNITE-22218 removing TableViewInternal#leaderAssignment
JAkutenshi Jun 23, 2024
cf6b513
IGNITE-22218 `ItBuildIndexTest` is cleaned
JAkutenshi Jun 24, 2024
ac413ce
IGNITE-22218 `ItPrimaryReplicaChoiceTest` is cleaned
JAkutenshi Jun 24, 2024
4da946b
IGNITE-22218 replace `join()` with `get()` with 15s timeout in `ItPri…
JAkutenshi Jun 24, 2024
1faa636
IGNITE-22218 NPE on `getRaftClient()` in `ItBuildIndexTest` is fixed
JAkutenshi Jun 24, 2024
de9c0cd
IGNITE-22218 `ItIgniteInMemoryNodeRestartTest#solePartitionAssignment…
JAkutenshi Jun 24, 2024
9235b4b
IGNITE-22218 `ReplicaTestUtils` is introduced
JAkutenshi Jun 24, 2024
db58d53
IGNITE-22315 review fix
JAkutenshi Jun 24, 2024
7efeda5
IGNITE-22218 some `leaderAssignment` calls replacements
JAkutenshi Jun 24, 2024
d65ac90
IGNITE-22218 `ItRebalanceDistributedTest` is cleaned
JAkutenshi Jun 24, 2024
4dee45b
IGNITE-22218 `TableRaftService` is removed
JAkutenshi Jun 24, 2024
72273ee
IGNITE-22218 `updateTableRaftService` lambda is removed
JAkutenshi Jun 24, 2024
ad772fd
Merge branch 'refs/heads/main' into ignite-22315-redone
JAkutenshi Jun 26, 2024
4c46c98
Merge branch 'refs/heads/ignite-22315-redone' into ignite-22218
JAkutenshi Jun 26, 2024
4f27636
IGNITE-22567 review fixes
JAkutenshi Jul 1, 2024
3353ee5
IGNITE-22567 Temporal `isReplicaStarted` check on `handleChangePendin…
JAkutenshi Jul 1, 2024
7ddd40a
IGNITE-22567 using `pendingAssignments` check before `changePeersOnRe…
JAkutenshi Jul 1, 2024
601c00e
IGNITE-22315 double-check asserts
JAkutenshi Jul 1, 2024
b754d08
IGNITE-22315 assert fix
JAkutenshi Jul 1, 2024
0eb4061
IGNITE-22315 return `isReplicaStarted` for `handleChangePendingAssign…
JAkutenshi Jul 1, 2024
7fbd1c0
IGNITE-22315 remove assert inside `updatePartitionClients` non-local …
JAkutenshi Jul 1, 2024
f0eb3f8
IGNITE-22315 `isReplicaStarted` isn't using now
JAkutenshi Jul 2, 2024
e068e55
IGNITE-22315 PMD fix
JAkutenshi Jul 2, 2024
b3036df
IGNITE-22315 add additional log information in the assertion
JAkutenshi Jul 2, 2024
54c137d
IGNITE-22315 add additional log information in both assertions and ex…
JAkutenshi Jul 3, 2024
b681255
IGNITE-22315 `isNodeInReducedStableAndPendingAssignmentsUnionSet` con…
JAkutenshi Jul 3, 2024
3c4e4c5
IGNITE-22218 Make DI with `PlacementDriver` for `DummyInternalTableImpl`
JAkutenshi Jul 3, 2024
bdecfb0
IGNITE-22218 Checkstyle fix
JAkutenshi Jul 4, 2024
4dedc25
IGNITE-22315 Add busylock on update
JAkutenshi Jul 4, 2024
5663d9d
IGNITE-22315 Added a note about busy lock in `stopAsync`
JAkutenshi Jul 4, 2024
1d982ad
IGNITE-22315 Checks that local mode may be a primary replica inside `…
JAkutenshi Jul 4, 2024
532bf5c
IGNITE-22315 fix timestamp for `getPrimaryReplica` using
JAkutenshi Jul 4, 2024
4171117
IGNITE-22315 check primary replica on pending assignments handling
JAkutenshi Jul 4, 2024
797ce61
IGNITE-22315 more certain assertions' logs
JAkutenshi Jul 4, 2024
accf73e
IGNITE-22315 add busy lock on `updatePartitionClients`
JAkutenshi Jul 4, 2024
b387feb
Merge branch 'refs/heads/main' into ignite-22315-redone
Jul 8, 2024
c7bacd6
IGNITE-22315 After merge compilation fixes
JAkutenshi Jul 8, 2024
23d42c7
IGNITE-22315 Debug LOG.info are removed
JAkutenshi Jul 8, 2024
968e8f7
Merge branch 'refs/heads/main' into ignite-22315-redone
JAkutenshi Jul 8, 2024
7152315
Merge branch 'refs/heads/ignite-22315-redone' into ignite-22218
JAkutenshi Jul 8, 2024
92884e6
IGNITE-22218 After-merge fix
JAkutenshi Jul 8, 2024
c2a2d7b
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 9, 2024
bb1a3eb
IGNITE-22218 After-merge fixes
JAkutenshi Jul 9, 2024
4744424
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 9, 2024
0c16054
IGNITE-22218 `PartitionReplicaLifecycleManager` now has no `isReplica…
JAkutenshi Jul 10, 2024
e3bc9d1
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 10, 2024
dbab29d
IGNITE-22218 blank line
JAkutenshi Jul 10, 2024
f3b8432
IGNITE-22218 assert on negative safetime case
JAkutenshi Jul 10, 2024
8cbf59a
IGNITE-22218 assert's message is made better
JAkutenshi Jul 10, 2024
0d02a72
IGNITE-22218 safetime fix
JAkutenshi Jul 10, 2024
93858e2
IGNITE-22218 Just curious with negative safetime
JAkutenshi Jul 10, 2024
4dd7746
IGNITE-22218 comparing sign and exception type fix
JAkutenshi Jul 10, 2024
df6e3aa
IGNITE-22218 log more
JAkutenshi Jul 10, 2024
b120097
IGNITE-22218 proper method usage
JAkutenshi Jul 10, 2024
af521c4
IGNITE-22218 back `IllegalArgumentException`
JAkutenshi Jul 10, 2024
04c2d47
IGNITE-22218 throw `AssertionError` if there still time problems
JAkutenshi Jul 10, 2024
fac4c1c
IGNITE-22218 Correct catch type
JAkutenshi Jul 11, 2024
324308b
IGNITE-22218 safetime fix for the case where there wasn't any metasto…
JAkutenshi Jul 11, 2024
0a94b98
IGNITE-22218 fixing race in `PartitionReplicaLifecycleManager#stopAnd…
JAkutenshi Jul 12, 2024
f7ce7b3
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 15, 2024
b327439
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 18, 2024
4e8baed
IGNITE-22218 aftermerge fix
JAkutenshi Jul 18, 2024
f6cb46e
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 22, 2024
7396fea
IGNITE-22218 after merge fix
JAkutenshi Jul 22, 2024
1493647
IGNITE-22218 after review fixes
JAkutenshi Jul 22, 2024
437b262
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 23, 2024
cf31bf8
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 24, 2024
a20cb9e
IGNITE-22218 after merge fixes
JAkutenshi Jul 24, 2024
0713ecb
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 25, 2024
9ac58dd
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 25, 2024
90c83d4
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 26, 2024
144b115
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 29, 2024
583ffee
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Jul 30, 2024
8622db4
IGNITE-22218 `getLeaseholderId` to `getLeaseholder` replacement insid…
JAkutenshi Jul 31, 2024
a91d6e9
IGNITE-22218 `ItRebalanceDistributedTest` tests are fixed with primar…
JAkutenshi Aug 1, 2024
bfb871a
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Aug 1, 2024
bf7e3b1
IGNITE-22218 `PartitionReplicaLifecycleManager#isLocalNodeIsPrimary` …
JAkutenshi Aug 1, 2024
f694a61
IGNITE-22218 `testZoneReplicaListener` is disabled
JAkutenshi Aug 1, 2024
565f102
IGNITE-22218 `testZoneReplicaListener` is disabled (with colocation e…
JAkutenshi Aug 1, 2024
5f6e52d
Merge branch 'refs/heads/main' into ignite-22218
JAkutenshi Aug 5, 2024
af264ce
IGNITE-22218 `testZoneReplicaListener` is disabled with proper ticket
JAkutenshi Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
Expand Down Expand Up @@ -461,12 +460,6 @@ public Publisher<BinaryRow> lookup(
throw new IgniteInternalException(new OperationNotSupportedException());
}

@Override
public TableRaftService tableRaftService() {
throw new IgniteInternalException(new OperationNotSupportedException());
}


@Override public TxStateTableStorage txStateStorage() {
return null;
}
Expand Down
1 change: 1 addition & 0 deletions modules/index/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-table'))
integrationTestImplementation testFixtures(project(':ignite-storage-api'))
integrationTestImplementation testFixtures(project(':ignite-catalog'))
integrationTestImplementation testFixtures(project(':ignite-replicator'))
sanpwc marked this conversation as resolved.
Show resolved Hide resolved
integrationTestImplementation libs.jetbrains.annotations
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
Expand Down Expand Up @@ -176,6 +177,7 @@
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
Expand Down Expand Up @@ -304,6 +306,8 @@ private void startNodes(TestInfo testInfo, int amount) throws NodeStoppingExcept

node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster");

placementDriver.setPrimary(node0.toClusterNode());

nodes.values().forEach(Node::waitWatches);

assertThat(
Expand Down Expand Up @@ -346,6 +350,7 @@ private void stopNode(int idx) {
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22928")
public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);

Expand Down Expand Up @@ -1125,8 +1130,9 @@ public CompletableFuture<Boolean> invoke(
lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
threadPoolsManager.partitionOperationsExecutor()
);
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriver);

StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);

Expand Down Expand Up @@ -1280,6 +1286,10 @@ void stop() {
nodeCfgGenerator.close();
clusterCfgGenerator.close();
}

ClusterNode toClusterNode() {
return new ClusterNodeImpl(name, name, networkAddress);
}
}

@FunctionalInterface
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
Expand Down Expand Up @@ -154,13 +155,14 @@ public void testPrimaryChangeSubscription() throws Exception {

@Test
public void testPrimaryChangeLongHandling() throws Exception {
TableViewInternal tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
IgniteImpl node = node(0);
TableViewInternal tbl = unwrapTableImpl(node.tables().table(TABLE_NAME));

var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);

CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica(
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node(0).clock().now(),
node.clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
Expand All @@ -184,7 +186,7 @@ public void testPrimaryChangeLongHandling() throws Exception {
CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));

waitingForLeaderCache(tbl, primary);
waitingForLeaderCache(node, tbl);

assertFalse(primaryChangeTask.isDone());

Expand Down Expand Up @@ -399,12 +401,13 @@ private int getIndexId(String idxName) {
/**
* Waits when the leader would be a different with the current primary replica.
*
* @param node Ignite node.
* @param tbl Table.
* @param primary Current primary replica name.
* @throws InterruptedException If fail.
*/
private static void waitingForLeaderCache(TableViewInternal tbl, String primary) throws InterruptedException {
RaftGroupService raftSrvc = tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
private static void waitingForLeaderCache(IgniteImpl node, TableViewInternal tbl) throws InterruptedException {
RaftGroupService raftSrvc = ReplicaTestUtils.getRaftClient(node, tbl.tableId(), 0)
.orElseThrow(AssertionError::new);

assertTrue(waitForCondition(() -> {
raftSrvc.refreshLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ public RaftCommandRunner raftClient() {
return replicaManager.startReplica(
groupId,
newConfiguration,
(unused) -> { },
(unused) -> listener,
new PendingComparableValuesTracker<>(Long.MAX_VALUE),
completedFuture(raftClient));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -578,7 +577,6 @@ private CompletableFuture<Replica> startReplicaInternal(
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
SnapshotStorageFactory snapshotStorageFactory,
Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
TablePartitionId replicaGrpId,
Expand All @@ -604,7 +602,6 @@ private CompletableFuture<Replica> startReplicaInternal(
return startReplica(
replicaGrpId,
newConfiguration,
updateTableRaftService,
createListener,
storageIndexTracker,
newRaftClientFut
Expand All @@ -618,8 +615,6 @@ private CompletableFuture<Replica> startReplicaInternal(
* @param raftGroupListener Raft group listener for raft group starting.
* @param isVolatileStorage is table storage volatile?
* @param snapshotStorageFactory Snapshot storage factory for raft group option's parameterization.
* @param updateTableRaftService Temporal consumer while TableRaftService wouldn't be removed in
* TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
* @param createListener Due to creation of ReplicaListener in TableManager, the function returns desired listener by created
* raft-client inside {@link #startReplica} method.
* @param replicaGrpId Replication group id.
Expand All @@ -633,7 +628,6 @@ public CompletableFuture<Replica> startReplica(
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
SnapshotStorageFactory snapshotStorageFactory,
Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
TablePartitionId replicaGrpId,
Expand All @@ -649,7 +643,6 @@ public CompletableFuture<Replica> startReplica(
raftGroupListener,
isVolatileStorage,
snapshotStorageFactory,
updateTableRaftService,
createListener,
storageIndexTracker,
replicaGrpId,
Expand Down Expand Up @@ -745,8 +738,6 @@ public CompletableFuture<Replica> startReplica(
*
* @param replicaGrpId Replication group id.
* @param newConfiguration Peers and Learners of the Raft group.
* @param updateTableRaftService A temporal clojure that updates table raft service with new raft-client, but
* TODO: will be removed https://issues.apache.org/jira/browse/IGNITE-22218
* @param createListener A clojure that returns done {@link ReplicaListener} by given raft-client {@link RaftGroupService}.
* @param storageIndexTracker Storage index tracker.
* @param newRaftClientFut A future that returns created raft-client.
Expand All @@ -758,19 +749,14 @@ public CompletableFuture<Replica> startReplica(
public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
PeersAndLearners newConfiguration,
Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut
) throws NodeStoppingException {
LOG.info("Replica is about to start [replicationGroupId={}].", replicaGrpId);

return newRaftClientFut
.thenApplyAsync(raftClient -> {
// TODO: will be removed in https://issues.apache.org/jira/browse/IGNITE-22218
updateTableRaftService.accept(raftClient);
return createListener.apply(raftClient);
}, replicasCreationExecutor)
.thenApplyAsync(createListener, replicasCreationExecutor)
.thenCompose(replicaListener -> startReplica(replicaGrpId, storageIndexTracker, completedFuture(replicaListener)));
}

Expand Down Expand Up @@ -1147,6 +1133,9 @@ private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> replicaFu
* @param replicaGrpId Replication group id.
* @return True if the replica is started.
*/
@TestOnly
@VisibleForTesting
@Deprecated
public boolean isReplicaStarted(ReplicationGroupId replicaGrpId) {
CompletableFuture<Replica> replicaFuture = replicas.get(replicaGrpId);
return replicaFuture != null && isCompletedSuccessfully(replicaFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ void testReplicaEvents(
CompletableFuture<Replica> startReplicaFuture = replicaManager.startReplica(
groupId,
newConfiguration,
(unused) -> { },
(unused) -> replicaListener,
new PendingComparableValuesTracker<>(0L),
completedFuture(raftGroupService)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.replicator;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.TestOnly;

/** Utilities for working with replicas and replicas manager in tests. */
public final class ReplicaTestUtils {
/**
* Returns raft-client if exists.
*
* @param node Ignite node that hosts the raft-client.
* @param tableId Desired table's ID.
* @param partId Desired partition's ID.
*
* @return Optional with raft-client if exists on the node by given identifiers.
*/
@TestOnly
public static Optional<RaftGroupService> getRaftClient(Ignite node, int tableId, int partId) {
return getRaftClient(getReplicaManager(node), tableId, partId);
}

/**
* Returns raft-client if exists.
*
* @param replicaManager Ignite node's replica manager with replica that should contains a raft client.
* @param tableId Desired table's ID.
* @param partId Desired partition's ID.
*
* @return Optional with raft-client if exists on the node by given identifiers.
*/
@TestOnly
public static Optional<RaftGroupService> getRaftClient(ReplicaManager replicaManager, int tableId, int partId) {
CompletableFuture<Replica> replicaFut = replicaManager
.replica(new TablePartitionId(tableId, partId));

if (replicaFut == null) {
return Optional.empty();
}

try {
return Optional.of(replicaFut.get(15, TimeUnit.SECONDS).raftClient());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
return Optional.empty();
}
}

/**
* Extracts {@link ReplicaManager} from the given {@link Ignite} node.
*
* @param node The given node with desired replica manager.
*
* @return Replica manager component from given node.
*/
@TestOnly
public static ReplicaManager getReplicaManager(Ignite node) {
return IgniteTestUtils.getFieldValue(node, "replicaMgr");
}

/**
* Extracts {@link TopologyService} from the given {@link Ignite} node.
*
* @param node The given node with desired topology service.
*
* @return Topology service component from given node.
*/
@TestOnly
private static TopologyService getTopologyService(Ignite node) {
ClusterService clusterService = IgniteTestUtils.getFieldValue(node, "clusterSvc");
return clusterService.topologyService();
}

/**
* Returns cluster node that is the leader of the corresponding partition group or throws an exception if it cannot be found.
*
* @param node Ignite node with raft client.
* @param tableId Table identifier.
* @param partId Partition number.
*
* @return Leader node of the partition group corresponding to the partition
*/
@TestOnly
public static ClusterNode leaderAssignment(Ignite node, int tableId, int partId) {
return leaderAssignment(getReplicaManager(node), getTopologyService(node), tableId, partId);
}

/**
* Returns cluster node that is the leader of the corresponding partition group or throws an exception if it cannot be found.
*
* @param replicaManager Ignite node's replica manager with replica that should contains a raft client.
* @param topologyService Ignite node's topology service that should find and return leader cluster node.
* @param tableId Table identifier.
* @param partId Partition number.
*
* @return Leader node of the partition group corresponding to the partition
*/
@TestOnly
public static ClusterNode leaderAssignment(ReplicaManager replicaManager, TopologyService topologyService, int tableId, int partId) {
RaftGroupService raftClient = getRaftClient(replicaManager, tableId, partId)
.orElseThrow(() -> new IgniteInternalException("No such partition " + partId + " in table " + tableId));

if (raftClient.leader() == null) {
try {
raftClient.refreshLeader().get(15, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IgniteInternalException("Couldn't get a leader for partition " + partId + " in table " + tableId, e);
}
}

return topologyService.getByConsistentId(raftClient.leader().consistentId());
}
}
1 change: 1 addition & 0 deletions modules/runner/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-failure-handler'))
integrationTestImplementation testFixtures(project(':ignite-metrics:'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
integrationTestImplementation testFixtures(project(':ignite-replicator'))
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.rocksdb.jni
Expand Down
Loading