Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of ZooCache instances and Watchers #5183

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
Expand All @@ -106,6 +105,7 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -127,9 +127,9 @@ public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);

private final ClientInfo info;
private InstanceId instanceId;
private final Supplier<InstanceId> instanceId;
private final ZooReader zooReader;
private final ZooCache zooCache;
private final Supplier<ZooCache> zooCache;

private Credentials creds;
private BatchWriterConfig batchWriterConfig;
Expand Down Expand Up @@ -229,8 +229,38 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
this.info = info;
this.hadoopConf = info.getHadoopConf();
zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
zooCache =
new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());

// Get the instanceID using ZooKeeper, not ZooCache as we will need
// the instanceID to create the ZooKeeper root path when creating
// ZooCache. If the instanceId cannot be found, this is a good
// time to find out.
final String instanceName = info.getInstanceName();
final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;

instanceId = memoize(() -> {
try {
// Call getZooReader() instead of using the local variable because
// ServerContext overrides getZooReader() to return a connection
// that has been set with the secret.
byte[] data = getZooReader().getData(instanceNamePath);
if (data == null) {
throw new IllegalArgumentException("Instance name " + instanceName
+ " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
final String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (getZooReader().getData(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new IllegalArgumentException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
return InstanceId.of(instanceIdString);
} catch (KeeperException | InterruptedException e) {
throw new IllegalArgumentException("Unable to create client, instanceId not found", e);
}
});
zooCache = memoize(() -> new ZooCache(ZooUtil.getRoot(getInstanceID()), getZooReader(), null));
this.serverConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -484,26 +514,7 @@ public synchronized TCredentials rpcCreds() {
* @return a UUID
*/
public InstanceId getInstanceID() {
if (instanceId == null) {
// lookup by name
final String instanceName = info.getInstanceName();
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zooCache.get(instanceNamePath);
if (data == null) {
throw new RuntimeException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new RuntimeException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
instanceId = InstanceId.of(instanceIdString);
}
return instanceId;
return instanceId.get();
}

public String getZooKeeperRoot() {
Expand Down Expand Up @@ -541,7 +552,7 @@ public int getZooKeepersSessionTimeOut() {
}

public ZooCache getZooCache() {
return zooCache;
return zooCache.get();
}

private TableZooHelper tableZooHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.rpc.ThriftUtil.createClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport;
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
Expand Down Expand Up @@ -467,22 +465,6 @@ public void waitForBalance() throws AccumuloException {

}

/**
* Given a zooCache and instanceId, look up the instance name.
*/
public static String lookupInstanceName(ZooCache zooCache, InstanceId instanceId) {
checkArgument(zooCache != null, "zooCache is null");
checkArgument(instanceId != null, "instanceId is null");
for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
var bytes = zooCache.get(Constants.ZROOT + Constants.ZINSTANCES + "/" + name);
InstanceId iid = InstanceId.of(new String(bytes, UTF_8));
if (iid.equals(instanceId)) {
return name;
}
}
return null;
}

@Override
public InstanceId getInstanceId() {
return context.getInstanceID();
Expand Down
Loading
Loading