Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix peer sync replcation test check #131

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
Expand All @@ -38,7 +41,6 @@
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.ReplicationHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,52 +129,53 @@ public void test() throws Exception {
waitForThingsToLevelOut(30);

checkShardConsistency(false, true);

// bring down the other node and index a few docs; so the leader and other node segments diverge
forceNodeFailures(singletonList(secondNode));
for (int i = 0; i < 10; i++) {
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
if(i % 2 == 0) {
commit();
}
}
commit();
restartNodes(singletonList(secondNode));

// start the freshNode
ChaosMonkey.start(freshNode.jetty);
nodesDown.remove(freshNode);

waitTillNodesActive();
waitForThingsToLevelOut(30);

//TODO check how to see if fresh node went into recovery (may be check count for replication handler on new leader)

long numRequestsBefore = (Long) secondNode.jetty
.getCoreContainer()
.getCores()
.iterator()
.next()
.getRequestHandler(ReplicationHandler.PATH)
.getStatistics().get("requests");
restartNodes(singletonList(freshNode));

String replicationProperties = (String) freshNode.jetty.getSolrHome() + "/cores/" + DEFAULT_TEST_COLLECTION_NAME + "/data/replication.properties";
String md5 = DigestUtils.md5Hex(Files.readAllBytes(Paths.get(replicationProperties)));

// shutdown the original leader
log.info("Now shutting down initial leader");
forceNodeFailures(singletonList(initialLeaderJetty));
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , 15);
waitTillNodesActive();
log.info("Updating mappings from zk");
updateMappingsFromZk(jettys, clients, true);

long numRequestsAfter = (Long) secondNode.jetty
.getCoreContainer()
.getCores()
.iterator()
.next()
.getRequestHandler(ReplicationHandler.PATH)
.getStatistics().get("requests");

assertEquals("Node went into replication", numRequestsBefore, numRequestsAfter);
assertEquals("Node went into replication", md5, DigestUtils.md5Hex(Files.readAllBytes(Paths.get(replicationProperties))));

success = true;
} finally {
System.clearProperty("solr.disableFingerprint");
}
}

private void restartNodes(List<CloudJettyRunner> nodesToRestart) throws Exception {
for (CloudJettyRunner node : nodesToRestart) {
chaosMonkey.start(node.jetty);
nodesDown.remove(node);
}
waitTillNodesActive();
checkShardConsistency(false, true);
}


private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
chaosMonkey.killJetty(replicaToShutDown);
waitForNoShardInconsistency();
}

int totalDown = 0;
Expand Down Expand Up @@ -205,8 +208,10 @@ private void waitTillNodesActive() throws Exception {
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;

Collection<String> nodesDownNames = nodesDown.stream().map(n -> n.coreNodeName).collect(Collectors.toList());

Collection<Replica> replicasToCheck = null;
replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
replicasToCheck = replicas.stream().filter(r -> !nodesDownNames.contains(r.getName()))
.collect(Collectors.toList());

for (Replica replica : replicasToCheck) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -41,15 +43,14 @@
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.ReplicationHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Collections.singletonList;

/**
* Test sync peer sync when a node restarts and documents are indexed when node was down.
* Test PeerSync when a node restarts and documents are indexed when node was down.
*
* This test is modeled after SyncSliceTest
*/
Expand Down Expand Up @@ -121,7 +122,7 @@ public void test() throws Exception {
List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
CloudJettyRunner neverLeader = otherJetties.get(otherJetties.size() - 1);
otherJetties.remove(neverLeader) ;

// first shutdown a node that will never be a leader
forceNodeFailures(singletonList(neverLeader));

Expand Down Expand Up @@ -199,7 +200,6 @@ private void indexInBackground(int numDocs) {
private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
chaosMonkey.killJetty(replicaToShutDown);
waitForNoShardInconsistency();
}

int totalDown = 0;
Expand All @@ -218,8 +218,6 @@ private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws
assertEquals(getShardCount() - totalDown, jetties.size());

nodesDown.addAll(replicasToShutDown);

Thread.sleep(3000);
}


Expand Down Expand Up @@ -253,14 +251,6 @@ private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner leaderJetty,
// disable fingerprint check if needed
System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));

long numRequestsBefore = (Long) leaderJetty.jetty
.getCoreContainer()
.getCores()
.iterator()
.next()
.getRequestHandler(ReplicationHandler.PATH)
.getStatistics().get("requests");

indexInBackground(50);

// bring back dead node and ensure it recovers
Expand All @@ -279,15 +269,9 @@ private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner leaderJetty,
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(docId, cloudClientDocs);

long numRequestsAfter = (Long) leaderJetty.jetty
.getCoreContainer()
.getCores()
.iterator()
.next()
.getRequestHandler(ReplicationHandler.PATH)
.getStatistics().get("requests");

assertEquals("PeerSync failed. Had to fail back to replication", numRequestsBefore, numRequestsAfter);
// if there was no replication, we should not have replication.properties file
String replicationProperties = (String) nodeToBringUp.jetty.getSolrHome() + "/cores/" + DEFAULT_TEST_COLLECTION_NAME + "/data/replication.properties";
assertTrue("PeerSync failed. Had to fail back to replication", Files.notExists(Paths.get(replicationProperties)));
}


Expand All @@ -302,9 +286,11 @@ private void waitTillNodesActive() throws Exception {
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;

Collection<String> nodesDownNames = nodesDown.stream().map(n -> n.coreNodeName).collect(Collectors.toList());

Collection<Replica> replicasToCheck = null;
replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
.collect(Collectors.toList());
replicasToCheck = replicas.stream().filter(r -> !nodesDownNames.contains(r.getName()))
.collect(Collectors.toList());

for (Replica replica : replicasToCheck) {
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Repl
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection("collection1");
Slice slice = coll.getSlice(shardName);
if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
log.info("New leader got elected in {} secs", i);
if(slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getState() == State.ACTIVE) {
log.info("Old leader {}, new leader. New leader got elected in {} secs", oldLeader, slice.getLeader(), i);
break;
}

Expand Down