Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22629 Fixed the test ItReplicaStateManagerTest.testReplicaStatesManagement #4040

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ private void countDownPartitionsFromZone(Set<Assignment> stable) {

Set<Integer> counter = fromBytes(counterEntry.value());

assert !counter.isEmpty();

if (!counter.contains(tablePartitionId.tableId())) {
// Count down for this table has already been processed, just skip.
// For example, this can happen when leader re-election happened during the rebalance process.
LOG.info("Counter count down skipped, because the counter doesn't contain the tableId=" + tablePartitionId.tableId());

return;
}

Expand Down Expand Up @@ -312,6 +312,11 @@ private void countDownPartitionsFromZone(Set<Assignment> stable) {
} catch (InterruptedException | ExecutionException e) {
// TODO: IGNITE-14693
LOG.warn("Unable to count down partitions counter in metastore: " + tablePartitionId, e);
} catch (Throwable e) {
// TODO: IGNITE-14693
LOG.error("Unable to count down partitions counter in metastore: " + tablePartitionId, e);

throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,8 @@ CompletableFuture<Boolean> weakStartReplica(
} else if (state == ReplicaState.PRIMARY_ONLY) {
context.replicaState = ReplicaState.ASSIGNED;

LOG.debug("Weak replica start complete [state={}].", context.replicaState);

return trueCompletedFuture();
} // else no-op.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.ignite.internal.runner.app;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
Expand Down Expand Up @@ -53,6 +51,19 @@
* Test for replica lifecycle.
*/
public class ItReplicaStateManagerTest extends BaseIgniteRestartTest {
private static final String[] ATTRIBUTES = {
"{region:{attribute:\"REG0\"}}",
"{region:{attribute:\"REG1\"}}",
"{region:{attribute:\"REG2\"}}"
};

private static final String ZONE_NAME = "TEST_ZONE";

@Override
protected String configurationString(int idx) {
return configurationString(idx, ATTRIBUTES[idx]);
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22629")
public void testReplicaStatesManagement() throws InterruptedException {
Expand All @@ -61,15 +72,17 @@ public void testReplicaStatesManagement() throws InterruptedException {

IgniteImpl node0 = nodes.get(0);

String zone = "TEST_ZONE";
String tableName = "TEST";

node0.sql().execute(null,
String.format("CREATE ZONE IF NOT EXISTS %s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
zone, 3, 1, DEFAULT_STORAGE_PROFILE));
ZONE_NAME, 3, 1, DEFAULT_STORAGE_PROFILE));

node0.sql().execute(null,
String.format("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='%s'", tableName, zone));
String.format("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='%s'", tableName,
ZONE_NAME
)
);

TableImpl tbl = unwrapTableImpl(node0.tables().table("TEST"));
int tableId = tbl.tableId();
Expand All @@ -84,18 +97,10 @@ public void testReplicaStatesManagement() throws InterruptedException {

log.info("Test: primary replica is " + replicaMeta);

// This will be the pending assignments excluding the primary replica node.
Set<Assignment> newPendingAssignments = nodes.stream()
.filter(n -> !n.id().equals(replicaMeta.getLeaseholderId()))
.map(n -> Assignment.forPeer(n.name()))
.collect(toSet());

ByteArray pendingAssignmentsKey = pendingPartAssignmentsKey(partId);

log.info("Test: Excluding the current primary from assignments. The replica should stay alive.");

// Excluding the current primary from assignments. The replica should stay alive.
node0.metaStorageManager().put(pendingAssignmentsKey, Assignments.toBytes(newPendingAssignments));
node0.sql().execute(null, alterZoneSql(filterForNodes(nodes, replicaMeta.getLeaseholderId())));

ByteArray stableAssignmentsKey = stablePartAssignmentsKey(partId);

Expand All @@ -106,11 +111,7 @@ public void testReplicaStatesManagement() throws InterruptedException {
log.info("Test: Including it back.");

// Including it back.
Set<Assignment> pendingAssignmentsAllNodes = nodes.stream()
.map(n -> Assignment.forPeer(n.name()))
.collect(toSet());

node0.metaStorageManager().put(pendingAssignmentsKey, Assignments.toBytes(pendingAssignmentsAllNodes));
node0.sql().execute(null, alterZoneSql(filterForNodes(nodes, null)));

waitForStableAssignments(node0.metaStorageManager(), stableAssignmentsKey.bytes(), nodesCount);

Expand All @@ -119,7 +120,7 @@ public void testReplicaStatesManagement() throws InterruptedException {
log.info("Test: Excluding again.");

// Excluding again.
node0.metaStorageManager().put(pendingAssignmentsKey, Assignments.toBytes(newPendingAssignments));
node0.sql().execute(null, alterZoneSql(filterForNodes(nodes, replicaMeta.getLeaseholderId())));

waitForStableAssignments(node0.metaStorageManager(), stableAssignmentsKey.bytes(), nodesCount - 1);

Expand Down Expand Up @@ -149,6 +150,30 @@ public void testReplicaStatesManagement() throws InterruptedException {
assertTrue(success);
}

private static String alterZoneSql(String filter) {
return String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", ZONE_NAME, filter);
}

private static String filterForNodes(List<IgniteImpl> nodes, @Nullable String excludeId) {
StringBuilder attrs = new StringBuilder();

for (int idx = 0; idx < nodes.size(); idx++) {
IgniteImpl node = nodes.get(idx);

if (excludeId != null && node.id().equals(excludeId)) {
continue;
}

if (!attrs.toString().isEmpty()) {
attrs.append(" || ");
}

attrs.append("@.region == \"REG" + idx + "\"");
}

return "$[?(" + attrs + ")]";
}

@Nullable
private static MvPartitionStorage storage(IgniteImpl node) {
TableImpl t = unwrapTableImpl(node.tables().table("TEST"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public static <T extends IgniteComponent> T findComponent(List<IgniteComponent>
* @param idx Node index.
* @return Configuration string.
*/
protected static String configurationString(int idx) {
protected String configurationString(int idx) {
return configurationString(idx, "{}");
}

Expand Down