Skip to content

Commit

Permalink
Reduce number of ZooCache instances and Watchers
Browse files Browse the repository at this point in the history
This commit removes most of the places where ZooCache
instances were being created in favor of re-using the
ZooCache from the ClientContext. Additionally, this
commit does not place a Watcher on each node that is
cached and instead places a single persistent
recursive Watcher at the paths in which the caching
is taking place.

This change roughly reduces the Watchers reported in
WatchTheWatchCountIT by 50%. While reducing the number
of Watchers, this commit could reduce ZooKeeper server
performance in two ways:

  1. There is a note in the ZooKeeper javadoc for the
     AddWatchMode enum that states there is a small
     performance decrease when using recursive watchers
     as all of the segments of ZNode paths need to be
     checked for watch triggering.

  2. Because a Watcher is not set on each node this
     commit modified the ZooCache.ZCacheWatcher to
     remove the parent of the triggered node, the
     triggered node, and all of its siblings from the
     cache. This overmatching may mean increased
     lookups in ZooKeeper.

Related to apache#5134
Closes apache#5154, apache#5157
  • Loading branch information
dlmarion committed Dec 13, 2024
1 parent f4ef6ff commit 5d23c12
Show file tree
Hide file tree
Showing 24 changed files with 457 additions and 623 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
Expand All @@ -106,6 +105,7 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -127,9 +127,9 @@ public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);

private final ClientInfo info;
private InstanceId instanceId;
private final Supplier<InstanceId> instanceId;
private final ZooReader zooReader;
private final ZooCache zooCache;
private final Supplier<ZooCache> zooCache;

private Credentials creds;
private BatchWriterConfig batchWriterConfig;
Expand Down Expand Up @@ -229,8 +229,38 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
this.info = info;
this.hadoopConf = info.getHadoopConf();
zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
zooCache =
new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());

// Get the instanceID using ZooKeeper, not ZooCache as we will need
// the instanceID to create the ZooKeeper root path when creating
// ZooCache. If the instanceId cannot be found, this is a good
// time to find out.
final String instanceName = info.getInstanceName();
final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;

instanceId = memoize(() -> {
try {
// Call getZooReader() instead of using the local variable because
// ServerContext overrides getZooReader() to return a connection
// that has been set with the secret.
byte[] data = getZooReader().getData(instanceNamePath);
if (data == null) {
throw new IllegalArgumentException("Instance name " + instanceName
+ " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
final String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (getZooReader().getData(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new IllegalArgumentException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
return InstanceId.of(instanceIdString);
} catch (KeeperException | InterruptedException e) {
throw new IllegalArgumentException("Unable to create client, instanceId not found", e);
}
});
zooCache = memoize(() -> new ZooCache(ZooUtil.getRoot(getInstanceID()), getZooReader(), null));
this.serverConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -484,26 +514,7 @@ public synchronized TCredentials rpcCreds() {
* @return a UUID
*/
public InstanceId getInstanceID() {
if (instanceId == null) {
// lookup by name
final String instanceName = info.getInstanceName();
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zooCache.get(instanceNamePath);
if (data == null) {
throw new RuntimeException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new RuntimeException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
instanceId = InstanceId.of(instanceIdString);
}
return instanceId;
return instanceId.get();
}

public String getZooKeeperRoot() {
Expand Down Expand Up @@ -541,7 +552,7 @@ public int getZooKeepersSessionTimeOut() {
}

public ZooCache getZooCache() {
return zooCache;
return zooCache.get();
}

private TableZooHelper tableZooHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.rpc.ThriftUtil.createClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport;
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
Expand Down Expand Up @@ -467,22 +465,6 @@ public void waitForBalance() throws AccumuloException {

}

/**
* Given a zooCache and instanceId, look up the instance name.
*/
public static String lookupInstanceName(ZooCache zooCache, InstanceId instanceId) {
checkArgument(zooCache != null, "zooCache is null");
checkArgument(instanceId != null, "instanceId is null");
for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
var bytes = zooCache.get(Constants.ZROOT + Constants.ZINSTANCES + "/" + name);
InstanceId iid = InstanceId.of(new String(bytes, UTF_8));
if (iid.equals(instanceId)) {
return name;
}
}
return null;
}

@Override
public InstanceId getInstanceId() {
return context.getInstanceID();
Expand Down
Loading

0 comments on commit 5d23c12

Please sign in to comment.