From 5d23c12b00bbb681de290808b51a27d070b45f25 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Dec 2024 23:37:52 +0000 Subject: [PATCH] Reduce number of ZooCache instances and Watchers This commit removes most of the places where ZooCache instances were being created in favor of re-using the ZooCache from the ClientContext. Additionally, this commit does not place a Watcher on each node that is cached and instead places a single persistent recursive Watcher at the paths in which the caching is taking place. This change roughly reduces the Watchers reported in WatchTheWatchCountIT by 50%. While reducing the number of Watchers, this commit could reduce ZooKeeper server performance in two ways: 1. There is a note in the ZooKeeper javadoc for the AddWatchMode enum that states there is a small performance decrease when using recursive watchers as all of the segments of ZNode paths need to be checked for watch triggering. 2. Because a Watcher is not set on each node this commit modified the ZooCache.ZCacheWatcher to remove the parent of the triggered node, the triggered node, and all of its siblings from the cache. This overmatching may mean increased lookups in ZooKeeper. Related to #5134 Closes #5154, #5157 --- .../core/clientImpl/ClientContext.java | 63 ++++--- .../clientImpl/InstanceOperationsImpl.java | 18 -- .../core/fate/zookeeper/ZooCache.java | 156 ++++++++++------- .../core/fate/zookeeper/ZooCacheFactory.java | 128 -------------- .../fate/zookeeper/ZooCacheFactoryTest.java | 83 --------- .../core/fate/zookeeper/ZooCacheTest.java | 91 ++++++---- .../apache/accumulo/server/ServerInfo.java | 63 ++++--- .../server/manager/LiveTServerSet.java | 14 +- .../server/security/SecurityOperation.java | 6 +- .../handler/KerberosAuthenticator.java | 40 ++--- .../security/handler/ZKAuthenticator.java | 54 +++--- .../server/security/handler/ZKAuthorizor.java | 21 +-- .../security/handler/ZKPermHandler.java | 123 +++++++------- .../accumulo/server/tables/TableManager.java | 10 +- .../accumulo/server/util/ListInstances.java | 2 +- .../security/handler/ZKAuthenticatorTest.java | 11 +- .../manager/recovery/RecoveryManager.java | 5 +- .../apache/accumulo/tserver/ScanServer.java | 10 -- .../accumulo/tserver/TabletClientHandler.java | 8 +- .../accumulo/tserver/TabletHostingServer.java | 3 - .../apache/accumulo/tserver/TabletServer.java | 9 - .../test/BadDeleteMarkersCreatedIT.java | 1 - .../test/functional/CacheTestReader.java | 2 +- .../accumulo/test/zookeeper/ZooCacheIT.java | 159 +++++++++++------- 24 files changed, 457 insertions(+), 623 deletions(-) delete mode 100644 core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java delete mode 100644 core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 3c93a6d39e2..2c8260c6efa 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -79,7 +79,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; -import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; @@ -106,6 +105,7 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,9 +127,9 @@ public class ClientContext implements AccumuloClient { private static final Logger log = LoggerFactory.getLogger(ClientContext.class); private final ClientInfo info; - private InstanceId instanceId; + private final Supplier instanceId; private final ZooReader zooReader; - private final ZooCache zooCache; + private final Supplier zooCache; private Credentials creds; private BatchWriterConfig batchWriterConfig; @@ -229,8 +229,38 @@ public ClientContext(SingletonReservation reservation, ClientInfo info, this.info = info; this.hadoopConf = info.getHadoopConf(); zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()); - zooCache = - new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()); + + // Get the instanceID using ZooKeeper, not ZooCache as we will need + // the instanceID to create the ZooKeeper root path when creating + // ZooCache. If the instanceId cannot be found, this is a good + // time to find out. + final String instanceName = info.getInstanceName(); + final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + + instanceId = memoize(() -> { + try { + // Call getZooReader() instead of using the local variable because + // ServerContext overrides getZooReader() to return a connection + // that has been set with the secret. + byte[] data = getZooReader().getData(instanceNamePath); + if (data == null) { + throw new IllegalArgumentException("Instance name " + instanceName + + " does not exist in zookeeper. " + + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + final String instanceIdString = new String(data, UTF_8); + // verify that the instanceId found via the instanceName actually exists as an instance + if (getZooReader().getData(Constants.ZROOT + "/" + instanceIdString) == null) { + throw new IllegalArgumentException("Instance id " + instanceIdString + + (instanceName == null ? "" : " pointed to by the name " + instanceName) + + " does not exist in zookeeper"); + } + return InstanceId.of(instanceIdString); + } catch (KeeperException | InterruptedException e) { + throw new IllegalArgumentException("Unable to create client, instanceId not found", e); + } + }); + zooCache = memoize(() -> new ZooCache(ZooUtil.getRoot(getInstanceID()), getZooReader(), null)); this.serverConf = serverConf; timeoutSupplier = memoizeWithExpiration( () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS); @@ -484,26 +514,7 @@ public synchronized TCredentials rpcCreds() { * @return a UUID */ public InstanceId getInstanceID() { - if (instanceId == null) { - // lookup by name - final String instanceName = info.getInstanceName(); - String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; - byte[] data = zooCache.get(instanceNamePath); - if (data == null) { - throw new RuntimeException( - "Instance name " + instanceName + " does not exist in zookeeper. " - + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); - } - String instanceIdString = new String(data, UTF_8); - // verify that the instanceId found via the instanceName actually exists as an instance - if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) { - throw new RuntimeException("Instance id " + instanceIdString - + (instanceName == null ? "" : " pointed to by the name " + instanceName) - + " does not exist in zookeeper"); - } - instanceId = InstanceId.of(instanceIdString); - } - return instanceId; + return instanceId.get(); } public String getZooKeeperRoot() { @@ -541,7 +552,7 @@ public int getZooKeepersSessionTimeOut() { } public ZooCache getZooCache() { - return zooCache; + return zooCache.get(); } private TableZooHelper tableZooHelper; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 9b260d2ba05..7c22b7d8a87 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; @@ -60,7 +59,6 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.DeprecatedPropertyUtil; import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; @@ -467,22 +465,6 @@ public void waitForBalance() throws AccumuloException { } - /** - * Given a zooCache and instanceId, look up the instance name. - */ - public static String lookupInstanceName(ZooCache zooCache, InstanceId instanceId) { - checkArgument(zooCache != null, "zooCache is null"); - checkArgument(instanceId != null, "instanceId is null"); - for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { - var bytes = zooCache.get(Constants.ZROOT + Constants.ZINSTANCES + "/" + name); - InstanceId iid = InstanceId.of(new String(bytes, UTF_8)); - if (iid.equals(instanceId)) { - return name; - } - } - return null; - } - @Override public InstanceId getInstanceId() { return context.getInstanceID(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index 596037c1de1..38c12439dcf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -25,15 +25,19 @@ import java.util.ConcurrentModificationException; import java.util.List; import java.util.Optional; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Predicate; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.cache.Caches; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; @@ -44,26 +48,50 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + /** * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ public class ZooCache { private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private final ZCacheWatcher watcher = new ZCacheWatcher(); + protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS, + Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMONITOR_LOCK, + Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, + Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET}; + + protected final TreeSet watchedPaths = new TreeSet<>(); + // visible for tests + protected final ZCacheWatcher watcher = new ZCacheWatcher(); private final Watcher externalWatcher; private static final AtomicLong nextCacheId = new AtomicLong(0); private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); - // The concurrent map returned by Caffiene will only allow one thread to run at a time for a given + public static final Duration CACHE_DURATION = Duration.ofMinutes(30); + + // public and non-final because this is being set + // in tests to test the eviction + @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", + justification = "being set in tests for eviction test") + public static Ticker ticker = Ticker.systemTicker(); + + // Construct this here, otherwise end up with NPE in some cases + // when the Watcher tries to access nodeCache. Alternative would + // be to mark nodeCache as volatile. + private final Cache cache = + Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker) + .expireAfterAccess(CACHE_DURATION).build(); + + // The concurrent map returned by Caffeine will only allow one thread to run at a time for a given // key and ZooCache relies on that. Not all concurrent map implementations have this behavior for // their compute functions. - private final ConcurrentMap nodeCache; + private final ConcurrentMap nodeCache = cache.asMap(); private final ZooReader zReader; @@ -114,21 +142,28 @@ private ZooKeeper getZooKeeper() { return zReader.getZooKeeper(); } - private class ZCacheWatcher implements Watcher { + public class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{}: {}", cacheId, event); + log.trace("Watcher Event {} {}: {}", cacheId, event.getType(), event); } switch (event.getType()) { - case NodeDataChanged: - case NodeChildrenChanged: case NodeCreated: + case NodeChildrenChanged: + case NodeDataChanged: case NodeDeleted: case ChildWatchRemoved: case DataWatchRemoved: - remove(event.getPath()); + // This code use to call remove(path), but that was when a Watcher was set + // on each node. With the Watcher being set at a higher level we need to remove + // the parent of the affected node and all of its children from the cache + // so that the parent and children node can be re-cached. If we only remove the + // affected node, then the cached children in the parent could be incorrect. + int lastSlash = event.getPath().lastIndexOf('/'); + String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash); + clear((path) -> path.startsWith(parent)); break; case None: switch (event.getState()) { @@ -172,35 +207,45 @@ public void process(WatchedEvent event) { * @param reader ZooKeeper reader * @param watcher watcher object */ - public ZooCache(ZooReader reader, Watcher watcher) { - this(reader, watcher, Duration.ofMinutes(3)); - } - - public ZooCache(ZooReader reader, Watcher watcher, Duration timeout) { + public ZooCache(String zooRoot, ZooReader reader, Watcher watcher) { this.zReader = reader; this.externalWatcher = watcher; - RemovalListener removalListerner = (path, zcNode, reason) -> { - try { - log.trace("{} removing watches for {} because {} accesses {}", cacheId, path, reason, - zcNode == null ? -1 : zcNode.getAccessCount()); - reader.getZooKeeper().removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, - false); - } catch (InterruptedException | KeeperException | RuntimeException e) { - log.warn("{} failed to remove watches on path {} in zookeeper", cacheId, path, e); - } - }; - // Must register the removal listener using evictionListener inorder for removal to be mutually - // exclusive with any other operations on the same path. This is important for watcher - // consistency, concurrently adding and removing watches for the same path would leave zoocache - // in a really bad state. The cache builder has another way to register a removal listener that - // is not mutually exclusive. - Cache cache = - Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) - .expireAfterAccess(timeout).evictionListener(removalListerner).build(); - nodeCache = cache.asMap(); + setupWatchers(zooRoot); log.trace("{} created new cache", cacheId, new Exception()); } + // Visible for testing + protected void setupWatchers(String zooRoot) { + try { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + zReader.getZooKeeper().addWatch(zPath, this.watcher, AddWatchMode.PERSISTENT_RECURSIVE); + log.trace("Added persistent recursive watcher at {}", zPath); + } + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error setting up persistent recursive watcher", e); + } + } + + private boolean isWatchedPath(String path) { + // Check that the path is equal to, or a descendant of, a watched path + for (String watchedPath : watchedPaths) { + if (path.startsWith(watchedPath)) { + return true; + } + } + return false; + } + + // Use this instead of Preconditions.checkState(isWatchedPath, String) + // so that we are not creating String unnecessarily. + private void ensureWatched(String path) { + if (!isWatchedPath(path)) { + throw new IllegalStateException("Supplied path " + path + " is not watched by this ZooCache"); + } + } + private abstract static class ZooRunnable { /** * Runs an operation against ZooKeeper. Retries are performed by the retry method when @@ -298,6 +343,7 @@ private ZcInterruptedException(InterruptedException e) { */ public List getChildren(final String zPath) { Preconditions.checkState(!closed); + ensureWatched(zPath); ZooRunnable> zr = new ZooRunnable<>() { @@ -324,12 +370,12 @@ public List run() throws KeeperException, InterruptedException { // That is ok because the compute() call on the map has a lock and processing the event // will block until compute() returns. After compute() returns the event processing // would clear the map entry. - Stat stat = zooKeeper.exists(zPath, watcher); + Stat stat = zooKeeper.exists(zPath, null); if (stat == null) { log.trace("{} getChildren saw that {} does not exists", cacheId, zPath); return ZcNode.NON_EXISTENT; } - List children = zooKeeper.getChildren(zPath, watcher); + List children = zooKeeper.getChildren(zPath, null); log.trace("{} adding {} children of {} to cache", cacheId, children.size(), zPath); return new ZcNode(children, zcn); } catch (KeeperException.NoNodeException nne) { @@ -372,6 +418,7 @@ public byte[] get(final String zPath) { */ public byte[] get(final String zPath, final ZcStat status) { Preconditions.checkState(!closed); + ensureWatched(zPath); ZooRunnable zr = new ZooRunnable<>() { @Override @@ -403,7 +450,7 @@ public byte[] run() throws KeeperException, InterruptedException { */ try { final ZooKeeper zooKeeper = getZooKeeper(); - Stat stat = zooKeeper.exists(zPath, watcher); + Stat stat = zooKeeper.exists(zPath, null); if (stat == null) { if (log.isTraceEnabled()) { log.trace("{} zookeeper did not contain {}", cacheId, zPath); @@ -413,7 +460,7 @@ public byte[] run() throws KeeperException, InterruptedException { byte[] data = null; ZcStat zstat = null; try { - data = zooKeeper.getData(zPath, watcher, stat); + data = zooKeeper.getData(zPath, null, stat); zstat = new ZcStat(stat); } catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) { throw new ConcurrentModificationException(e1); @@ -459,16 +506,10 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) { } } - private void remove(String zPath) { - nodeCache.remove(zPath); - log.trace("{} removed {} from cache", cacheId, zPath); - updateCount.incrementAndGet(); - } - /** * Clears this cache. */ - public void clear() { + private void clear() { Preconditions.checkState(!closed); nodeCache.clear(); updateCount.incrementAndGet(); @@ -496,6 +537,7 @@ public long getUpdateCount() { */ @VisibleForTesting public boolean dataCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedData(); } @@ -508,6 +550,7 @@ public boolean dataCached(String zPath) { */ @VisibleForTesting public boolean childrenCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedChildren(); } @@ -518,20 +561,15 @@ public boolean childrenCached(String zPath) { public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); - Predicate pathPredicateToUse; - if (log.isTraceEnabled()) { - pathPredicateToUse = path -> { - boolean testResult = pathPredicate.test(path); - if (testResult) { - log.trace("{} removing {} from cache", cacheId, path); - } - return testResult; - }; - } else { - pathPredicateToUse = pathPredicate; - } - nodeCache.keySet().removeIf(pathPredicateToUse); - updateCount.incrementAndGet(); + Predicate pathPredicateWrapper = path -> { + boolean testResult = isWatchedPath(path) && pathPredicate.test(path); + if (testResult) { + updateCount.incrementAndGet(); + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + nodeCache.keySet().removeIf(pathPredicateWrapper); } /** @@ -540,10 +578,12 @@ public void clear(Predicate pathPredicate) { * @param zPath path of top node */ public void clear(String zPath) { + ensureWatched(zPath); clear(path -> path.startsWith(zPath)); } public Optional getLockData(ServiceLockPath path) { + ensureWatched(path.toString()); List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { return Optional.empty(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java deleted file mode 100644 index 2e1987af344..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate.zookeeper; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.singletons.SingletonManager; -import org.apache.accumulo.core.singletons.SingletonService; - -/** - * A factory for {@link ZooCache} instances. - *

- * Implementation note: We were using the instances map to track all the instances that have been - * created, so we could explicitly close them when the SingletonManager detected that the last - * legacy client (using Connector/ZooKeeperInstance) has gone away. This class may no longer be - * needed, since the legacy client code has been removed, so long as the ZooCache instances it is - * tracking are managed as resources within ClientContext or ServerContext, and explicitly closed - * when those are closed. - */ -public class ZooCacheFactory { - - private static final Map instances = new HashMap<>(); - private static boolean enabled = true; - - public ZooCacheFactory() {} - - private static boolean isEnabled() { - synchronized (instances) { - return enabled; - } - } - - private static void enable() { - synchronized (instances) { - enabled = true; - } - } - - private static void disable() { - synchronized (instances) { - try { - instances.values().forEach(ZooCache::close); - } finally { - instances.clear(); - enabled = false; - } - } - } - - static { - // important because of ZOOKEEPER-2368.. when zookeeper client is closed it does not generate an - // event! - SingletonManager.register(new SingletonService() { - - @Override - public synchronized boolean isEnabled() { - return ZooCacheFactory.isEnabled(); - } - - @Override - public synchronized void enable() { - ZooCacheFactory.enable(); - } - - @Override - public synchronized void disable() { - ZooCacheFactory.disable(); - } - }); - - } - - /** - * Gets a {@link ZooCache}. The same object may be returned for multiple calls with the same - * arguments. - * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout session timeout - * @return cache object - */ - public ZooCache getZooCache(String zooKeepers, int sessionTimeout) { - String key = zooKeepers + ":" + sessionTimeout; - synchronized (instances) { - if (!isEnabled()) { - throw new IllegalStateException("The Accumulo singleton for zookeeper caching is " - + "disabled. This is likely caused by all AccumuloClients being closed"); - } - return instances.computeIfAbsent(key, k -> getNewZooCache(zooKeepers, sessionTimeout)); - } - } - - /** - * Always return a new {@link ZooCache}. - * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout session timeout - * @return a new instance - */ - public ZooCache getNewZooCache(String zooKeepers, int sessionTimeout) { - return new ZooCache(new ZooReader(zooKeepers, sessionTimeout), null); - } - - /** - * Resets the factory. All cached objects are flushed. - */ - void reset() { - synchronized (instances) { - instances.clear(); - } - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java deleted file mode 100644 index 7db2dd9df2e..00000000000 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate.zookeeper; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertSame; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class ZooCacheFactoryTest { - private ZooCacheFactory zcf; - - @BeforeEach - public void setUp() { - zcf = new ZooCacheFactory(); - } - - @AfterEach - public void tearDown() { - zcf.reset(); - } - - @Test - public void testGetZooCache() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getZooCache(zks1, timeout1); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertSame(zc1, zc1a); - - String zks2 = "zk2"; - int timeout2 = 1000; - ZooCache zc2 = zcf.getZooCache(zks2, timeout2); - assertNotSame(zc1, zc2); - - String zks3 = "zk1"; - int timeout3 = 2000; - ZooCache zc3 = zcf.getZooCache(zks3, timeout3); - assertNotSame(zc1, zc3); - } - - @Test - public void testGetNewZooCache() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getNewZooCache(zks1, timeout1); - assertNotNull(zc1); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertNotSame(zc1, zc1a); - ZooCache zc1b = zcf.getNewZooCache(zks1, timeout1); - assertNotSame(zc1, zc1b); - assertNotSame(zc1a, zc1b); - } - - @Test - public void testReset() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getZooCache(zks1, timeout1); - zcf.reset(); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertNotSame(zc1, zc1a); - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java index 3aced2e103e..43366ea73e7 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java @@ -19,12 +19,10 @@ package org.apache.accumulo.core.fate.zookeeper; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.createStrictMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -35,36 +33,62 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ZooCacheTest { - private static final String ZPATH = "/some/path/in/zk"; + + private static class TestZooCache extends ZooCache { + + /** + * Test class that extends ZooCache to suppress the creation of the persistent recursive + * watchers that are created in the constructor and to provide access to the watcher. + */ + public TestZooCache(String zooRoot, ZooReader reader, Watcher watcher) { + super(zooRoot, reader, watcher); + } + + @Override + protected void setupWatchers(String zooRoot) { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + } + } + + public void executeWatcher(WatchedEvent event) { + // simulate ZooKeeper calling our Watcher + watcher.process(event); + } + + } + + private static final String instancePath = Constants.ZROOT + "/" + UUID.randomUUID().toString(); + private static final String root = instancePath + Constants.ZTSERVERS; + private static final String ZPATH = root + "/testPath"; private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; private static final List CHILDREN = java.util.Arrays.asList("huey", "dewey", "louie"); private ZooReader zr; private ZooKeeper zk; - private ZooCache zc; + private TestZooCache zc; @BeforeEach - public void setUp() { + public void setUp() throws KeeperException, InterruptedException { zr = createMock(ZooReader.class); zk = createStrictMock(ZooKeeper.class); - expect(zr.getZooKeeper()).andReturn(zk); - expectLastCall().anyTimes(); + expect(zr.getZooKeeper()).andReturn(zk).anyTimes(); replay(zr); - - zc = new ZooCache(zr, null); + zc = new TestZooCache(instancePath, zr, null); } @Test @@ -85,8 +109,8 @@ private void testGet(boolean fillStat) throws Exception { final long ephemeralOwner = 123456789L; Stat existsStat = new Stat(); existsStat.setEphemeralOwner(ephemeralOwner); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(DATA); replay(zk); assertFalse(zc.dataCached(ZPATH)); @@ -294,29 +318,25 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchData(initialData); - w.process(event); + watchData(initialData); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.dataCached(ZPATH)); } - private Watcher watchData(byte[] initialData) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchData(byte[] initialData) throws Exception { Stat existsStat = new Stat(); if (initialData != null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))) - .andReturn(initialData); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(initialData); } else { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } replay(zk); zc.get(ZPATH); assertTrue(zc.dataCached(ZPATH)); - - return cw.getValue(); } @Test @@ -408,11 +428,11 @@ private void testGetBoth(boolean getDataFirst) throws Exception { private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception { WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchData(DATA); + watchData(DATA); assertTrue(zc.dataCached(ZPATH)); - w.process(event); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertFalse(zc.dataCached(ZPATH)); } @@ -442,27 +462,24 @@ private void testWatchChildrenNode(List initialChildren, WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchChildren(initialChildren); - w.process(event); + watchChildren(initialChildren); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.childrenCached(ZPATH)); } - private Watcher watchChildren(List initialChildren) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchChildren(List initialChildren) throws Exception { if (initialChildren == null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } else { Stat existsStat = new Stat(); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getChildren(eq(ZPATH), capture(cw))).andReturn(initialChildren); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getChildren(eq(ZPATH), eq(null))).andReturn(initialChildren); } replay(zk); zc.getChildren(ZPATH); assertTrue(zc.childrenCached(ZPATH)); - - return cw.getValue(); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java index aa7d2149dde..ee269a868e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.server; +import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; @@ -29,13 +30,11 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.InstanceOperationsImpl; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonManager.Mode; @@ -44,6 +43,7 @@ import org.apache.accumulo.server.security.SystemCredentials; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.KeeperException; public class ServerInfo implements ClientInfo { @@ -54,7 +54,6 @@ public class ServerInfo implements ClientInfo { private final String zooKeepers; private final int zooKeepersSessionTimeOut; private final VolumeManager volumeManager; - private final ZooCache zooCache; private final ServerDirs serverDirs; private final Credentials credentials; @@ -71,22 +70,26 @@ public class ServerInfo implements ClientInfo { } catch (IOException e) { throw new UncheckedIOException(e); } - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); - String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; - byte[] iidb = zooCache.get(instanceNamePath); - if (iidb == null) { - throw new IllegalStateException( - "Instance name " + instanceName + " does not exist in zookeeper. " - + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); - } - instanceID = InstanceId.of(new String(iidb, UTF_8)); - if (zooCache.get(ZooUtil.getRoot(instanceID)) == null) { - if (instanceName == null) { + final ZooReader zooReader = new ZooReader(this.zooKeepers, this.zooKeepersSessionTimeOut); + final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + try { + byte[] iidb = zooReader.getData(instanceNamePath); + if (iidb == null) { throw new IllegalStateException( - "Instance id " + instanceID + " does not exist in zookeeper"); + "Instance name " + instanceName + " does not exist in zookeeper. " + + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + instanceID = InstanceId.of(new String(iidb, UTF_8)); + if (zooReader.getData(ZooUtil.getRoot(instanceID)) == null) { + if (instanceName == null) { + throw new IllegalStateException( + "Instance id " + instanceID + " does not exist in zookeeper"); + } + throw new IllegalStateException("Instance id " + instanceID + " pointed to by the name " + + instanceName + " does not exist in zookeeper"); } - throw new IllegalStateException("Instance id " + instanceID + " pointed to by the name " - + instanceName + " does not exist in zookeeper"); + } catch (KeeperException | InterruptedException e) { + throw new IllegalArgumentException("Unabled to create client, instanceId not found", e); } serverDirs = new ServerDirs(siteConfig, hadoopConf); credentials = SystemCredentials.get(instanceID, siteConfig); @@ -106,9 +109,14 @@ public class ServerInfo implements ClientInfo { instanceID = VolumeManager.getInstanceIDFromHdfs(instanceIdPath, hadoopConf); zooKeepers = config.get(Property.INSTANCE_ZK_HOST); zooKeepersSessionTimeOut = (int) config.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); - instanceName = InstanceOperationsImpl.lookupInstanceName(zooCache, instanceID); credentials = SystemCredentials.get(instanceID, siteConfig); + final ZooReader zooReader = new ZooReader(this.zooKeepers, this.zooKeepersSessionTimeOut); + try { + instanceName = lookupInstanceName(zooReader, instanceID); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to lookup instanceName for instanceID: " + instanceID, + e); + } } ServerInfo(SiteConfiguration config, String instanceName, InstanceId instanceID) { @@ -123,12 +131,25 @@ public class ServerInfo implements ClientInfo { this.instanceID = instanceID; zooKeepers = config.get(Property.INSTANCE_ZK_HOST); zooKeepersSessionTimeOut = (int) config.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); this.instanceName = instanceName; serverDirs = new ServerDirs(siteConfig, hadoopConf); credentials = SystemCredentials.get(instanceID, siteConfig); } + private String lookupInstanceName(ZooReader zr, InstanceId instanceId) + throws KeeperException, InterruptedException { + checkArgument(zr != null, "zooReader is null"); + checkArgument(instanceId != null, "instanceId is null"); + for (String name : zr.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { + var bytes = zr.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name); + InstanceId iid = InstanceId.of(new String(bytes, UTF_8)); + if (iid.equals(instanceId)) { + return name; + } + } + return null; + } + public SiteConfiguration getSiteConfiguration() { return siteConfig; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index b6e51b412f7..51d94833510 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; @@ -77,7 +76,6 @@ public interface Listener { private final Listener cback; private final ServerContext context; - private ZooCache zooCache; public class TServerConnection { private final HostAndPort address; @@ -213,13 +211,6 @@ public LiveTServerSet(ServerContext context, Listener cback) { this.context = context; } - public synchronized ZooCache getZooCache() { - if (zooCache == null) { - zooCache = new ZooCache(context.getZooReader(), this); - } - return zooCache; - } - public synchronized void startListeningForTabletServerChanges() { scanServers(); @@ -267,7 +258,8 @@ private synchronized void checkServer(final Set updates, final TServerInfo info = current.get(tserverPath.getServer()); ZcStat stat = new ZcStat(); - Optional sld = ServiceLock.getLockData(getZooCache(), tserverPath, stat); + Optional sld = + ServiceLock.getLockData(this.context.getZooCache(), tserverPath, stat); if (sld.isEmpty()) { if (info != null) { @@ -482,7 +474,7 @@ public synchronized void remove(TServerInstance server) { log.error("FATAL: {}", msg, e); Halt.halt(msg, -1); } - getZooCache().clear(slp.toString()); + this.context.getZooCache().clear(slp.toString()); } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index e1fd176bae0..b12929a7160 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -41,7 +41,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.manager.thrift.FateOperation; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; @@ -74,7 +73,6 @@ public class SecurityOperation { private final PermissionHandler permHandle; private final boolean isKerberos; private final Supplier rootUserName; - private final ZooCache zooCache; private final String zkUserPath; protected final ServerContext context; @@ -105,8 +103,8 @@ protected SecurityOperation(ServerContext context, Authorizor author, Authentica PermissionHandler pm) { this.context = context; zkUserPath = context.zkUserPath(); - zooCache = new ZooCache(context.getZooReader(), null); - rootUserName = Suppliers.memoize(() -> new String(zooCache.get(zkUserPath), UTF_8)); + rootUserName = + Suppliers.memoize(() -> new String(context.getZooCache().get(zkUserPath), UTF_8)); authorizor = author; authenticator = authent; permHandle = pm; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java index 7aa23061c54..ece2ca9471b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.clientImpl.DelegationTokenImpl; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -51,7 +50,6 @@ public class KerberosAuthenticator implements Authenticator { Set.of(KerberosToken.class.getName(), SystemToken.class.getName()); private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator(); - private ZooCache zooCache; private ServerContext context; private String zkUserPath; private UserImpersonation impersonation; @@ -59,7 +57,6 @@ public class KerberosAuthenticator implements Authenticator { @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); impersonation = new UserImpersonation(context.getConfiguration()); zkAuthenticator.initialize(context); zkUserPath = context.zkUserPath(); @@ -71,12 +68,9 @@ public boolean validSecurityHandlers() { } private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], - NodeExistsPolicy.FAIL); - } + this.context.getZooCache().clear(zkUserPath + "/" + principal); + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL); } @Override @@ -84,22 +78,20 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } - - // prep parent node of users with root username - // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value - byte[] principalData = principal.getBytes(UTF_8); - zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); - - // Create the root user in ZK using base64 encoded name (since the name is included in the - // znode) - createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); } + + // prep parent node of users with root username + // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value + byte[] principalData = principal.getBytes(UTF_8); + zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); + + // Create the root user in ZK using base64 encoded name (since the name is included in the + // znode) + createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); } catch (KeeperException | InterruptedException e) { log.error("Failed to initialize security", e); throw new IllegalStateException(e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java index 10b7c4b880d..d67831c48fc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,12 +43,10 @@ public final class ZKAuthenticator implements Authenticator { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); zkUserPath = context.zkUserPath(); } @@ -58,18 +55,16 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); + } - // prep parent node of users with root username - zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); + // prep parent node of users with root username + zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); - constructUser(principal, ZKSecurityTool.createPass(token)); - } + constructUser(principal, ZKSecurityTool.createPass(token)); } catch (KeeperException | AccumuloException | InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -82,16 +77,14 @@ public void initializeSecurity(String principal, byte[] token) { */ private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + user, pass, NodeExistsPolicy.FAIL); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.putPrivatePersistentData(zkUserPath + "/" + user, pass, NodeExistsPolicy.FAIL); } @Override public Set listUsers() { - return new TreeSet<>(zooCache.getChildren(zkUserPath)); + return new TreeSet<>(this.context.getZooCache().getChildren(zkUserPath)); } @Override @@ -120,11 +113,8 @@ public void createUser(String principal, AuthenticationToken token) @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooReaderWriter().recursiveDelete(zkUserPath + "/" + user, - NodeMissingPolicy.FAIL); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); + context.getZooReaderWriter().recursiveDelete(zkUserPath + "/" + user, NodeMissingPolicy.FAIL); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -146,11 +136,9 @@ public void changePassword(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; if (userExists(principal)) { try { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + principal); - context.getZooReaderWriter().putPrivatePersistentData(zkUserPath + "/" + principal, - ZKSecurityTool.createPass(pt.getPassword()), NodeExistsPolicy.OVERWRITE); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + principal)); + context.getZooReaderWriter().putPrivatePersistentData(zkUserPath + "/" + principal, + ZKSecurityTool.createPass(pt.getPassword()), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e); @@ -169,7 +157,7 @@ public void changePassword(String principal, AuthenticationToken token) @Override public boolean userExists(String user) { - return zooCache.get(zkUserPath + "/" + user) != null; + return this.context.getZooCache().get(zkUserPath + "/" + user) != null; } @Override @@ -186,11 +174,11 @@ public boolean authenticateUser(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; byte[] zkData; String zpath = zkUserPath + "/" + principal; - zkData = zooCache.get(zpath); + zkData = this.context.getZooCache().get(zpath); boolean result = authenticateUser(principal, pt, zkData); if (!result) { - zooCache.clear(zpath); - zkData = zooCache.get(zpath); + this.context.getZooCache().clear(zpath); + zkData = this.context.getZooCache().get(zpath); result = authenticateUser(principal, pt, zkData); } return result; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java index 8c0691d86c0..0814fec9a4c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,18 +43,16 @@ public class ZKAuthorizor implements Authorizor { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); zkUserPath = context.zkUserPath(); } @Override public Authorizations getCachedUserAuthorizations(String user) { - byte[] authsBytes = zooCache.get(zkUserPath + "/" + user + ZKUserAuths); + byte[] authsBytes = this.context.getZooCache().get(zkUserPath + "/" + user + ZKUserAuths); if (authsBytes != null) { return ZKSecurityTool.convertAuthorizations(authsBytes); } @@ -105,11 +102,9 @@ public void initUser(String user) throws AccumuloSecurityException { @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -127,11 +122,9 @@ public void dropUser(String user) throws AccumuloSecurityException { public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooReaderWriter().putPersistentData(zkUserPath + "/" + user + ZKUserAuths, - ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); - } + this.context.getZooCache().clear(zkUserPath + "/" + user + ZKUserAuths); + context.getZooReaderWriter().putPersistentData(zkUserPath + "/" + user + ZKUserAuths, + ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java index f5b3768a854..c45760e449d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -53,18 +52,18 @@ public class ZKPermHandler implements PermissionHandler { private static final Logger log = LoggerFactory.getLogger(ZKPermHandler.class); + private ServerContext ctx; private ZooReaderWriter zoo; private String zkUserPath; private String ZKTablePath; private String ZKNamespacePath; - private ZooCache zooCache; private final String ZKUserSysPerms = "/System"; private final String ZKUserTablePerms = "/Tables"; private final String ZKUserNamespacePerms = "/Namespaces"; @Override public void initialize(ServerContext context) { - zooCache = new ZooCache(context.getZooReader(), null); + ctx = context; zoo = context.getZooReaderWriter(); zkUserPath = context.zkUserPath(); ZKTablePath = context.getZooKeeperRoot() + Constants.ZTABLES; @@ -113,7 +112,8 @@ public boolean hasTablePermission(String user, String table, TablePermission per @Override public boolean hasCachedTablePermission(String user, String table, TablePermission permission) { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); if (serializedPerms != null) { return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission); } @@ -164,8 +164,8 @@ public boolean hasNamespacePermission(String user, String namespace, @Override public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) { - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); if (serializedPerms != null) { return ZKSecurityTool.convertNamespacePermissions(serializedPerms).contains(permission); } @@ -176,7 +176,7 @@ public boolean hasCachedNamespacePermission(String user, String namespace, public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { try { - byte[] permBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] permBytes = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); Set perms; if (permBytes == null) { perms = new TreeSet<>(); @@ -185,11 +185,9 @@ public void grantSystemPermission(String user, SystemPermission permission) } if (perms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(perms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache().clear(zkUserPath + "/" + user + ZKUserSysPerms); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, + ZKSecurityTool.convertSystemPermissions(perms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -204,7 +202,8 @@ public void grantSystemPermission(String user, SystemPermission permission) public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { Set tablePerms; - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); if (serializedPerms != null) { tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); } else { @@ -213,11 +212,9 @@ public void grantTablePermission(String user, String table, TablePermission perm try { if (tablePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache().clear(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -232,8 +229,8 @@ public void grantTablePermission(String user, String table, TablePermission perm public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { Set namespacePerms; - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); if (serializedPerms != null) { namespacePerms = ZKSecurityTool.convertNamespacePermissions(serializedPerms); } else { @@ -242,12 +239,10 @@ public void grantNamespacePermission(String user, String namespace, try { if (namespacePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(namespacePerms), - NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache() + .clear(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + ZKSecurityTool.convertNamespacePermissions(namespacePerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -261,7 +256,7 @@ public void grantNamespacePermission(String user, String namespace, @Override public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { - byte[] sysPermBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] sysPermBytes = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); // User had no system permission, nothing to revoke. if (sysPermBytes == null) { @@ -272,11 +267,10 @@ public void revokeSystemPermission(String user, SystemPermission permission) try { if (sysPerms.remove(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(sysPerms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache() + .clear((path) -> path.startsWith(zkUserPath + "/" + user + ZKUserSysPerms)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, + ZKSecurityTool.convertSystemPermissions(sysPerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -290,7 +284,8 @@ public void revokeSystemPermission(String user, SystemPermission permission) @Override public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); // User had no table permission, nothing to revoke. if (serializedPerms == null) { @@ -300,7 +295,8 @@ public void revokeTablePermission(String user, String table, TablePermission per Set tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); try { if (tablePerms.remove(permission)) { - zooCache.clear(); + this.ctx.getZooCache().clear( + (path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); if (tablePerms.isEmpty()) { zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP); @@ -321,8 +317,8 @@ public void revokeTablePermission(String user, String table, TablePermission per @Override public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); // User had no namespace permission, nothing to revoke. if (serializedPerms == null) { @@ -333,7 +329,8 @@ public void revokeNamespacePermission(String user, String namespace, ZKSecurityTool.convertNamespacePermissions(serializedPerms); try { if (namespacePerms.remove(permission)) { - zooCache.clear(); + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); if (namespacePerms.isEmpty()) { zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP); @@ -355,12 +352,11 @@ public void revokeNamespacePermission(String user, String namespace, @Override public void cleanTablePermissions(String table) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - NodeMissingPolicy.SKIP); - } + for (String user : this.ctx.getZooCache().getChildren(zkUserPath)) { + this.ctx.getZooCache().clear( + (path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -374,12 +370,11 @@ public void cleanTablePermissions(String table) throws AccumuloSecurityException @Override public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - NodeMissingPolicy.SKIP); - } + for (String user : this.ctx.getZooCache().getChildren(zkUserPath)) { + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -455,11 +450,10 @@ public void initUser(String user) throws AccumuloSecurityException { */ private void createTablePerm(String user, TableId table, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL); - } + this.ctx.getZooCache() + .clear((path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL); } /** @@ -468,22 +462,19 @@ private void createTablePerm(String user, TableId table, Set pe */ private void createNamespacePerm(String user, NamespaceId namespace, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL); - } + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL); } @Override public void cleanUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); + this.ctx.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -523,7 +514,7 @@ public boolean hasSystemPermission(String user, SystemPermission permission) { @Override public boolean hasCachedSystemPermission(String user, SystemPermission permission) { - byte[] perms = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] perms = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); if (perms == null) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java index 86e33cc8f2a..3f15997b33f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java @@ -122,7 +122,7 @@ public TableManager(ServerContext context) { zkRoot = context.getZooKeeperRoot(); instanceID = context.getInstanceID(); zoo = context.getZooReaderWriter(); - zooStateCache = new ZooCache(zoo, new TableStateWatcher()); + zooStateCache = new ZooCache(zkRoot, zoo, new TableStateWatcher()); updateTableStateCache(); } @@ -256,6 +256,7 @@ public void process(WatchedEvent event) { log.trace("{}", event); } final String zPath = event.getPath(); + final EventType zType = event.getType(); String tablesPrefix = zkRoot + Constants.ZTABLES; @@ -270,9 +271,14 @@ public void process(WatchedEvent event) { } } if (tableId == null) { - log.warn("Unknown path in {}", event); + // not a path we care about + log.trace("Unknown path in {}", event); return; } + } else { + // not a path we care about + log.trace("Event fired for path {}, not something we care about", event); + return; } switch (zType) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index af98fd0d201..0450ade990c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -89,7 +89,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean System.out.println("INFO : Using ZooKeepers " + keepers); ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS); - ZooCache cache = new ZooCache(rdr, null); + ZooCache cache = new ZooCache(Constants.ZROOT, rdr, null); TreeMap instanceNames = getInstanceNames(rdr, printErrors); diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java index 6621feb93ff..1446c3f64fc 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.security.Authorizations; @@ -145,15 +146,17 @@ public void testUserAuthentication() throws Exception { var instanceId = InstanceId.of("example"); ServerContext context = MockServerContext.getWithZK(instanceId, "", 30_000); ZooReaderWriter zr = createMock(ZooReaderWriter.class); + ZooCache zc = createMock(ZooCache.class); expect(context.getZooReader()).andReturn(zr).anyTimes(); + expect(context.getZooCache()).andReturn(zc).anyTimes(); ZooKeeper zk = createMock(ZooKeeper.class); expect(zk.getChildren(anyObject(), anyObject())).andReturn(Arrays.asList(principal)).anyTimes(); expect(zk.exists(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), anyObject(Watcher.class))).andReturn(new Stat()).anyTimes(); expect(zr.getZooKeeper()).andReturn(zk).anyTimes(); - expect(zk.getData(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), - anyObject(), anyObject())).andReturn(newHash).once(); - replay(context, zr, zk); + expect(zc.get(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal))) + .andReturn(newHash); + replay(context, zr, zk, zc); // creating authenticator ZKAuthenticator auth = new ZKAuthenticator(); @@ -162,6 +165,6 @@ public void testUserAuthentication() throws Exception { PasswordToken token = new PasswordToken(rawPass.clone()); // verifying that if the new type of hash is stored in zk authentication works as expected assertTrue(auth.authenticateUser(principal, token)); - verify(context, zr, zk); + verify(context, zr, zk, zc); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 7270d8018f4..d2132762313 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -65,7 +64,6 @@ public class RecoveryManager { private final Cache existenceCache; private final ScheduledExecutorService executor; private final Manager manager; - private final ZooCache zooCache; public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { this.manager = manager; @@ -76,7 +74,6 @@ public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); - zooCache = new ZooCache(manager.getContext().getZooReader(), null); try { List workIDs = new DistributedWorkQueue(manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY, @@ -182,7 +179,7 @@ public boolean recoverLogs(KeyExtent extent, Collection walogs) throws sortQueued = sortsQueued.contains(sortId); } - if (sortQueued && zooCache.get( + if (sortQueued && this.manager.getContext().getZooCache().get( manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) == null) { synchronized (this) { sortsQueued.remove(sortId); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 021ee3dbde9..93e2835130f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -64,7 +64,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -204,8 +203,6 @@ private TabletMetadataLoader(Ample ample) { private ScanServerMetrics scanServerMetrics; private BlockCacheMetrics blockCacheMetrics; - private final ZooCache managerLockCache; - public ScanServer(ConfigOpts opts, String[] args) { super("sserver", opts, ServerContext::new, args); @@ -216,8 +213,6 @@ public ScanServer(ConfigOpts opts, String[] args) { this.resourceManager = new TabletServerResourceManager(context, this); - this.managerLockCache = new ZooCache(context.getZooReader(), null); - var readWriteLock = new ReentrantReadWriteLock(); reservationsReadLock = readWriteLock.readLock(); reservationsWriteLock = readWriteLock.writeLock(); @@ -1138,11 +1133,6 @@ public ServiceLock getLock() { return scanServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - @Override public BlockCacheConfiguration getBlockCacheConfiguration(AccumuloConfiguration acuConf) { return BlockCacheConfiguration.forScanServer(acuConf); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 0b5c2466473..aa8d9247f34 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockPaths; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -910,11 +911,12 @@ static void checkPermission(SecurityOperation security, ServerContext context, new ZooUtil.LockID(context.getServerPaths().createManagerPath().toString(), lock); try { - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { // maybe the cache is out of date and a new manager holds the // lock? - server.getManagerLockCache().clear(); - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + server.getContext().getZooCache().clear( + (path) -> path.equals(ServiceLockPaths.parse(Optional.empty(), lid.path).toString())); + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { log.warn("Got {} message from a manager that does not hold the current lock {}", request, lock); throw new RuntimeException("bad manager lock"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 8e5047bb40d..9760341a995 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -20,7 +20,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.scan.ScanServerInfo; @@ -58,7 +57,5 @@ public interface TabletHostingServer { ServiceLock getLock(); - ZooCache getManagerLockCache(); - BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index a7282c0688a..54121a3646d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -74,7 +74,6 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -160,8 +159,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private static final Logger log = LoggerFactory.getLogger(TabletServer.class); private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = TimeUnit.HOURS.toMillis(1); - final ZooCache managerLockCache; - final TabletServerLogger logger; private TabletServerMetrics metrics; @@ -232,7 +229,6 @@ protected TabletServer(ConfigOpts opts, Function serverContextFactory, String[] args) { super("tserver", opts, serverContextFactory, args); context = super.getContext(); - this.managerLockCache = new ZooCache(context.getZooReader(), null); final AccumuloConfiguration aconf = getConfiguration(); log.info("Version " + Constants.VERSION); log.info("Instance " + getInstanceID()); @@ -478,11 +474,6 @@ public ServiceLock getLock() { return tabletServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - private void announceExistence() { ZooReaderWriter zoo = getContext().getZooReaderWriter(); try { diff --git a/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java index ce0e21c4689..0b5ef9d119b 100644 --- a/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -96,7 +96,6 @@ public void alterConfig() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build(); ClientContext context = (ClientContext) client) { ZooCache zcache = context.getZooCache(); - zcache.clear(); var path = context.getServerPaths().createGarbageCollectorPath(); Optional gcLockData; do { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index be17ed550fb..ac98bbe1fe0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -46,7 +46,7 @@ public static void main(String[] args) throws Exception { File myfile = new File(reportDir + "/" + UUID.randomUUID()); myfile.deleteOnExit(); - ZooCache zc = new ZooCache(new ZooReader(keepers, 30000), null); + ZooCache zc = new ZooCache("/", new ZooReader(keepers, 30000), null); while (true) { if (myfile.exists() && !myfile.delete()) { diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java index 645b7c70d2f..e374fb3fb9f 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java @@ -20,17 +20,17 @@ import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; -import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.test.util.Wait; @@ -41,19 +41,46 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.github.benmanes.caffeine.cache.Ticker; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + @Tag(ZOOKEEPER_TESTING_SERVER) public class ZooCacheIT { + public static class ZooCacheTicker implements Ticker { + + private int advanceCounter = 0; + + @Override + public long read() { + return System.nanoTime() + (advanceCounter * ZooCache.CACHE_DURATION.toNanos()); + } + + public void advance() { + advanceCounter++; + } + + public void reset() { + advanceCounter = 0; + } + + } + private ZooKeeperTestingServer szk = null; private ZooReaderWriter zk = null; + private ZooCacheTicker ticker = new ZooCacheTicker(); @TempDir private File tempDir; @BeforeEach + @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", + justification = "setting ticker in test for eviction test") public void setup() throws Exception { szk = new ZooKeeperTestingServer(tempDir); zk = szk.getZooReaderWriter(); + ZooCache.ticker = ticker; } @AfterEach @@ -71,125 +98,133 @@ public void testGetChildren() throws Exception { watchesRemoved.add(event.getPath()); } }; - ZooCache zooCache = new ZooCache(zk, watcher, Duration.ofSeconds(3)); - zk.mkdirs("/test2"); - zk.mkdirs("/test3/c1"); - zk.mkdirs("/test3/c2"); + final String root = Constants.ZROOT + UUID.randomUUID().toString(); + + ZooCache zooCache = new ZooCache(root, zk, watcher); + + final String base = root + Constants.ZTSERVERS; + + zk.mkdirs(base + "/test2"); + zk.mkdirs(base + "/test3/c1"); + zk.mkdirs(base + "/test3/c2"); // cache non-existence of /test1 and existence of /test2 and /test3 long uc1 = zooCache.getUpdateCount(); - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); long uc2 = zooCache.getUpdateCount(); assertTrue(uc1 < uc2); - assertEquals(List.of(), zooCache.getChildren("/test2")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); long uc3 = zooCache.getUpdateCount(); assertTrue(uc2 < uc3); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); long uc4 = zooCache.getUpdateCount(); assertTrue(uc3 < uc4); // The cache should be stable now and new accesses should not change the update count - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); // once getChildren discovers that a node does not exists, then get data will also know this - assertNull(zooCache.get("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertNull(zooCache.get(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc4, zooCache.getUpdateCount()); // Had cached non-existence of "/test1", should get a notification that it was created - zk.mkdirs("/test1"); + zk.mkdirs(base + "/test1"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test1"); + var children = zooCache.getChildren(base + "/test1"); return children != null && children.isEmpty(); }); long uc5 = zooCache.getUpdateCount(); assertTrue(uc4 < uc5); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc5, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc5b = zooCache.getUpdateCount(); + assertTrue(uc5 < uc5b); // add a child to /test3, should get a notification of the change - zk.mkdirs("/test3/c3"); + zk.mkdirs(base + "/test3/c3"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); + System.out.println("children: " + children); return children != null && children.size() == 3; }); long uc6 = zooCache.getUpdateCount(); - assertTrue(uc5 < uc6); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc5b < uc6); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc6, zooCache.getUpdateCount()); // remove a child from /test3 - zk.delete("/test3/c2"); + zk.delete(base + "/test3/c2"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); return children != null && children.size() == 2; }); long uc7 = zooCache.getUpdateCount(); assertTrue(uc6 < uc7); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc7, zooCache.getUpdateCount()); // remove /test2, should start caching that it does not exist - zk.delete("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") == null); + zk.delete(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") == null); long uc8 = zooCache.getUpdateCount(); assertTrue(uc7 < uc8); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertNull(zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc8, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertNull(zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc8b = zooCache.getUpdateCount(); + assertTrue(uc8 < uc8b); // add /test2 back, should update - zk.mkdirs("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") != null); + zk.mkdirs(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") != null); long uc9 = zooCache.getUpdateCount(); - assertTrue(uc8 < uc9); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc9, zooCache.getUpdateCount()); + assertTrue(uc8b < uc9); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc9b = zooCache.getUpdateCount(); + assertTrue(uc9 < uc9b); // make multiple changes. the cache should see all of these - zk.delete("/test1"); - zk.mkdirs("/test2/ca"); - zk.delete("/test3/c1"); - zk.mkdirs("/test3/c4"); - zk.delete("/test3/c4"); - zk.mkdirs("/test3/c5"); + zk.delete(base + "/test1"); + zk.mkdirs(base + "/test2/ca"); + zk.delete(base + "/test3/c1"); + zk.mkdirs(base + "/test3/c4"); + zk.delete(base + "/test3/c4"); + zk.mkdirs(base + "/test3/c5"); Wait.waitFor(() -> { - var children1 = zooCache.getChildren("/test1"); - var children2 = zooCache.getChildren("/test2"); - var children3 = zooCache.getChildren("/test3"); + var children1 = zooCache.getChildren(base + "/test1"); + var children2 = zooCache.getChildren(base + "/test2"); + var children3 = zooCache.getChildren(base + "/test3"); return children1 == null && children2 != null && children2.size() == 1 && children3 != null && Set.copyOf(children3).equals(Set.of("c3", "c5")); }); long uc10 = zooCache.getUpdateCount(); - assertTrue(uc9 < uc10); - assertNull(zooCache.getChildren("/test1")); - assertEquals(List.of("ca"), zooCache.getChildren("/test2")); - assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc9b < uc10); + assertNull(zooCache.getChildren(base + "/test1")); + assertEquals(List.of("ca"), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc10, zooCache.getUpdateCount()); // wait for the cache to evict and clear watches + ticker.advance(); Wait.waitFor(() -> { // the cache will not run its eviction handler unless accessed, so access something that is // not expected to be evicted - zooCache.getChildren("/test4"); - return watchesRemoved.equals(Set.of("/test1", "/test2", "/test3")); + zooCache.getChildren(base + "/test4"); + return zooCache.childrenCached(base + "/test1") == false + && zooCache.childrenCached(base + "/test2") == false + && zooCache.childrenCached(base + "/test3") == false; }); - - assertFalse(zooCache.childrenCached("/test1")); - assertFalse(zooCache.childrenCached("/test2")); - assertFalse(zooCache.childrenCached("/test3")); } }