Skip to content

Commit

Permalink
#93 Election context close speed buff.
Browse files Browse the repository at this point in the history
  • Loading branch information
markrmiller committed Jul 12, 2020
1 parent 797eccf commit 770f3a7
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,38 +63,49 @@ public Overseer getOverseer() {

@Override
public void cancelElection() throws InterruptedException, KeeperException {

try {
super.cancelElection();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
}
try {
overseer.doClose();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
try (ParWork closer = new ParWork(this, true)) {
closer.collect(() -> {
try {
super.cancelElection();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
}
});
closer.collect(() -> {
try {
overseer.doClose();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
}
});
closer.addCollect("overseerElectionContextCancel");
}
}

@Override
public void close() {
try {
super.close();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}

try {
overseer.doClose();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
}
this.isClosed = true;

try (ParWork closer = new ParWork(this, true)) {
closer.collect(() -> {
try {
super.close();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}
});
closer.collect(() -> {
try {
overseer.doClose();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing Overseer", e);
}
});
closer.addCollect("overseerElectionContextClose");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ public ShardLeaderElectionContext(LeaderElector leaderElector,

@Override
public void close() {
super.close();
try {
cancelElection();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}
try {
syncStrategy.close();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception closing SyncStrategy", e);
try (ParWork closer = new ParWork(this, true)) {
closer.collect(() -> super.close());
closer.collect(() -> {
try {
cancelElection();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}
});
closer.collect(syncStrategy);
closer.addCollect("shardLeaderElectionContextClose");
}

this.isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class ShardLeaderElectionContextBase extends ElectionContext {

private volatile Integer leaderZkNodeParentVersion;

// Prevents a race between cancelling and becoming leader.
private final Object lock = new Object();

public ShardLeaderElectionContextBase(final String coreNodeName, String electionPath, String leaderPath,
ZkNodeProps props, SolrZkClient zkClient) {
super(coreNodeName, electionPath, leaderPath, props);
Expand All @@ -65,62 +62,56 @@ public void close() {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}
try {
cancelElection();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
log.error("Exception canceling election", e);
}
}

@Override
public void cancelElection() throws InterruptedException, KeeperException {
synchronized (lock) {
super.cancelElection();

Integer version = leaderZkNodeParentVersion;
if (version != null) {
try {
// We need to be careful and make sure we *only* delete our own leader registration node.
// We do this by using a multi and ensuring the parent znode of the leader registration node
// matches the version we expect - there is a setData call that increments the parent's znode
// version whenever a leader registers.
log.debug("Removing leader registration node on cancel: {} {}", leaderPath, version);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), version));
ops.add(Op.check(electionPath, -1));
ops.add(Op.delete(leaderPath, -1));
zkClient.multi(ops, true);
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
// okay
return;
}
if (e instanceof KeeperException.SessionExpiredException) {
log.warn("ZooKeeper session expired");
throw e;
}
super.cancelElection();
if (!zkClient.isConnected()) {
log.info("Can't cancel, zkClient is not connected");
}
Integer version = leaderZkNodeParentVersion;
if (version != null) {
try {
// We need to be careful and make sure we *only* delete our own leader registration node.
// We do this by using a multi and ensuring the parent znode of the leader registration node
// matches the version we expect - there is a setData call that increments the parent's znode
// version whenever a leader registers.
log.debug("Removing leader registration node on cancel: {} {}", leaderPath, version);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), version));
ops.add(Op.check(electionPath, -1));
ops.add(Op.delete(leaderPath, -1));
zkClient.multi(ops, true);
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
// okay
return;
}
if (e instanceof KeeperException.SessionExpiredException) {
log.warn("ZooKeeper session expired");
throw e;
}

List<OpResult> results = e.getResults();
for (OpResult result : results) {
if (((OpResult.ErrorResult) result).getErr() == -101) {
// no node, fine
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
}
List<OpResult> results = e.getResults();
for (OpResult result : results) {
if (((OpResult.ErrorResult) result).getErr() == -101) {
// no node, fine
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
}

} catch (InterruptedException | AlreadyClosedException e) {
ParWork.propegateInterrupt(e);
return;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
} finally {
version = null;
}
} else {
log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");

} catch (InterruptedException | AlreadyClosedException e) {
ParWork.propegateInterrupt(e);
return;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
} finally {
version = null;
}
} else {
log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
}
}

Expand All @@ -132,7 +123,6 @@ void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pau
String parent = Paths.get(leaderPath).getParent().toString();
List<String> errors = new ArrayList<>();
try {
synchronized (lock) {
log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath);
//zkClient.printLayout();
List<Op> ops = new ArrayList<>(3);
Expand Down Expand Up @@ -164,7 +154,6 @@ void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pau

}
assert leaderZkNodeParentVersion != null;
}

} catch (Throwable t) {
ParWork.propegateInterrupt(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,6 @@ public static LinkedHashMapWriter testForResponseElement(RestTestHarness harness
try {
m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
} catch (Exception e) {
Thread.sleep(100);
continue;

}
Expand All @@ -586,8 +585,6 @@ public static LinkedHashMapWriter testForResponseElement(RestTestHarness harness
break;
}
}
Thread.sleep(100);

}
assertTrue(StrUtils.formatString("Could not get expected value ''{0}'' for path ''{1}'' full output: {2}, from server: {3}", expected, StrUtils.join(jsonPath, '/'), m.toString(), testServerBaseUrl), success);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public static void afterSolrTestCase() throws Exception {
// if (closeTime.getClazz() == SolrCore.class) {
// continue;
// }
if (closeTime.getElapsedMS() > 8000) {
if (closeTime.getElapsedMS() > 1000) {
tooLongTime = closeTime.getElapsedMS();
clazz = closeTime.getClazz();
}
Expand Down

0 comments on commit 770f3a7

Please sign in to comment.