diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ChildWatch.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ChildWatch.java new file mode 100644 index 000000000000..9a0dd03b2533 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ChildWatch.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +import java.util.List; + +/** + * Callback interface for notifications of child-node changes in zookeeper state + */ +public interface ChildWatch { + + /** + * Called when the children of a watched node change + * @param children the new children + * @return true if the watch should be maintained + */ + boolean onChanged(List children); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java index 16b7fcd8b4b7..fab1678c512e 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java @@ -163,6 +163,8 @@ public void update(SolrZooKeeper keeper) { closeKeeper(keeper); throw new RuntimeException(t); } + + client.onConnect(); if (onReconnect != null) { onReconnect.command(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DataWatch.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DataWatch.java new file mode 100644 index 000000000000..a2a442ee2696 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DataWatch.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +/** + * Callback interface for notifications of changes in zookeeper state + */ +public interface DataWatch { + + /** + * Called when watched data node is updated + * @param version the data node version + * @param data the new data + * @return true if the node watch should be maintained + */ + boolean onChanged(int version, byte[] data); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 422d9e5d7a99..67c7e973a4ae 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -30,10 +30,16 @@ import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.solr.client.solrj.SolrServerException; @@ -85,6 +91,10 @@ public class SolrZkClient implements Closeable { private ZkACLProvider zkACLProvider; private String zkServerAddress; + private final ConcurrentHashMap> dataWatches = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> childWatches = new ConcurrentHashMap<>(); + private final ConcurrentHashMap versions = new ConcurrentHashMap<>(); + public int getZkClientTimeout() { return zkClientTimeout; } @@ -229,6 +239,241 @@ protected ZkACLProvider createZkACLProvider() { return new DefaultZkACLProvider(); } + private class NodeWatcher implements Watcher { + + final String node; + + private NodeWatcher(String node) { + this.node = node; + } + + @Override + public void process(WatchedEvent event) { + // session events are not change events, and do not remove the watcher + if (Event.EventType.None.equals(event.getType())) { + return; + } + readNode(node, this); + } + } + + /** + * Watch the data at a particular ZK node. + * + * Whenever data on the node changes, the new data is reported back to a + * {@link DataWatch} object. The watch is also notified when a client + * reconnects after a session expiry. + * + * Multiple watchers added to a single node will share ZK watchers. + * + * @param node the node to watch + * @param watch a {@link DataWatch} object to notify + */ + public void addDataWatch(String node, DataWatch watch) { + AtomicBoolean addWatch = new AtomicBoolean(false); + dataWatches.compute(node, (n, s) -> { + if (s == null) { + s = ConcurrentHashMap.newKeySet(); + addWatch.set(true); + } + s.add(watch); + return s; + }); + + readNode(node, addWatch.get() ? new NodeWatcher(node) : null); + + } + + /** + * Read the data at the supplied ZNode, and notify all registered watches + * @param node the node to read + */ + public void forceUpdateWatch(String node) { + readNode(node, null); + } + + public Map getWatchedVersions() { + return Collections.unmodifiableMap(versions); + } + + private void readNode(String node, Watcher watcher) { + Stat stat = new Stat(); + try { + while (true) { + try { + byte[] data = getData(node, watcher, stat, true); + notifyDataWatches(node, stat.getVersion(), data); + return; + } catch (KeeperException.NoNodeException e) { + stat = exists(node, watcher, true); + if (stat == null) { + notifyDataWatches(node, -1, null); + return; + } + } + } + } + catch (InterruptedException e) { + if (isClosed == false) + log.warn("Interrupted while setting watch on node {}", node); + Thread.currentThread().interrupt(); + } + catch (KeeperException e) { + if (isClosed == false) + log.error("Error setting watch on node {}: {}", node, e.getMessage()); + } + } + + /** + * Remove a {@link DataWatch} from a particular ZK node + * @param node the node to stop watching + * @param watch the watcher to remove + */ + public void removeDataWatch(String node, DataWatch watch) { + dataWatches.compute(node, (n, s) -> { + s.remove(watch); + if (s.size() == 0) + return null; + return s; + }); + } + + // lock to force linearization of notifications + private final Object notificationsLock = new Object(); + + private void notifyDataWatches(String node, int version, byte[] data) { + synchronized (notificationsLock) { + versions.put(node, version); + Set watchesToRemove = dataWatches.getOrDefault(node, Collections.emptySet()) + .stream() + .filter(watch -> watch.onChanged(version, data) == false) + .collect(Collectors.toSet()); + for (DataWatch watch : watchesToRemove) { + removeDataWatch(node, watch); + } + } + } + + private class ChildWatcher implements Watcher { + + final String node; + + private ChildWatcher(String node) { + this.node = node; + } + + @Override + public void process(WatchedEvent event) { + // session events are not change events, and do not remove the watcher + if (Event.EventType.None.equals(event.getType())) { + return; + } + readChildren(node, this); + } + } + + /** + * Watch the children of a particular ZK node. + * + * Whenever the children of the node change, the new child set is reported back to a + * {@link ChildWatch} object. The watch is also notified when a client + * reconnects after a session expiry. + * + * Multiple watchers added to a single node will share ZK watchers. + * + * @param node the node to watch + * @param watch a {@link ChildWatch} object to notify + */ + public void addChildWatch(String node, ChildWatch watch) { + AtomicBoolean addWatch = new AtomicBoolean(false); + childWatches.compute(node, (n, s) -> { + if (s == null) { + s = ConcurrentHashMap.newKeySet(); + addWatch.set(true); + } + s.add(watch); + return s; + }); + + readChildren(node, addWatch.get() ? new ChildWatcher(node) : null); + + } + + /** + * Read the children of a node from ZK, notifying all registered watchers + * @param node the node to check + */ + public void forceUpdateChildren(String node) { + readChildren(node, null); + } + + private void readChildren(String node, Watcher watcher) { + try { + while (true) { + try { + List children = getChildren(node, watcher, true); + notifyChildWatches(node, children); + return; + } catch (KeeperException.NoNodeException e) { + Stat stat = exists(node, watcher, true); + if (stat == null) { + notifyChildWatches(node, Collections.emptyList()); + return; + } + } + } + } + catch (InterruptedException e) { + if (isClosed == false) + log.warn("Interrupted while setting watch on node {}", node); + Thread.currentThread().interrupt(); + } + catch (KeeperException e) { + if (isClosed == false) + log.error("Error setting watch on node {}: {}", node, e.getMessage()); + } + } + + /** + * Remove a {@link ChildWatch} from a particular ZK node + * @param node the node to stop watching + * @param watch the watcher to remove + */ + public void removeChildWatch(String node, ChildWatch watch) { + dataWatches.compute(node, (n, s) -> { + s.remove(watch); + if (s.size() == 0) + return null; + return s; + }); + } + + private synchronized void notifyChildWatches(String node, List children) { + + Set watchesToRemove = childWatches.getOrDefault(node, Collections.emptySet()) + .stream() + .filter(watch -> watch.onChanged(children) == false) + .collect(Collectors.toSet()); + for (ChildWatch watch : watchesToRemove) { + removeChildWatch(node, watch); + } + } + + /** + * Called by the {@link ConnectionManager} when a session has been established + * + * For internal use only + */ + synchronized void onConnect() { + // re-establish all data watches + for (String node : dataWatches.keySet()) { + readNode(node, new NodeWatcher(node)); + } + for (String node : childWatches.keySet()) { + readChildren(node, new ChildWatcher(node)); + } + } + /** * Returns true if client is connected */ diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDataWatches.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDataWatches.java new file mode 100644 index 000000000000..6d9a4d35d938 --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDataWatches.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.ZkTestServer; +import org.apache.zookeeper.CreateMode; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestDataWatches extends SolrTestCaseJ4 { + + private static final int TIMEOUT = 30; + + private static ZkTestServer zkServer; + private static SolrZkClient client; + + @BeforeClass + public static void setupZkServer() throws Exception { + zkServer = new ZkTestServer(createTempDir().toString()); + zkServer.run(); + + // We have multiple sessions, so limit violations will be inaccurate + zkServer.setViolationReportAction(ZkTestServer.LimitViolationAction.IGNORE); + + client = new SolrZkClient(zkServer.getZkAddress(), TIMEOUT); + client.create("/", null, CreateMode.PERSISTENT, true); + + } + + @AfterClass + public static void tearDownZkServer() throws IOException, InterruptedException { + client.close(); + zkServer.shutdown(); + } + + private class SyncDataWatch implements DataWatch { + + int version; + byte[] data; + CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean onChanged(int version, byte[] data) { + this.version = version; + this.data = data; + this.latch.countDown(); + return true; // always retain watch + } + } + + private static byte[] bytes(String data) { + return data.getBytes(Charset.defaultCharset()); + } + + @Test + public void testSimpleDataWatch() throws Exception { + + SyncDataWatch watch = new SyncDataWatch(); + + // watch for non-existent node + client.addDataWatch("/test", watch); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(-1, watch.version); + assertEquals(null, watch.data); + + // watch should be called when data is added + watch.latch = new CountDownLatch(1); + client.create("/test", bytes("some data"), CreateMode.PERSISTENT, true); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(0, watch.version); + assertArrayEquals(bytes("some data"), watch.data); + assertEquals(0, client.getWatchedVersions().get("/test").intValue()); + + // and when data is updated + watch.latch = new CountDownLatch(1); + client.setData("/test", bytes("some more data"), true); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, watch.version); + assertArrayEquals(bytes("some more data"), watch.data); + assertEquals(1, client.getWatchedVersions().get("/test").intValue()); + + // and when data is deleted + watch.latch = new CountDownLatch(1); + client.delete("/test", 1, true); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(-1, watch.version); + assertEquals(null, watch.data); + assertEquals(-1, client.getWatchedVersions().get("/test").intValue()); + + // and when a forceUpdate is called + watch.latch = new CountDownLatch(1); + client.forceUpdateWatch("/test"); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + + } + + private class SyncChildWatch implements ChildWatch { + + List children = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean onChanged(List children) { + this.children.clear(); + this.children.addAll(children); + latch.countDown(); + return true; + } + } + + @Test + public void testSimpleChildWatch() throws Exception { + + SyncChildWatch watcher = new SyncChildWatch(); + + // watch for non-existent node + client.addChildWatch("/parent", watcher); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertTrue(watcher.children.isEmpty()); + + watcher.latch = new CountDownLatch(1); + client.create("/parent", bytes("test"), CreateMode.PERSISTENT, true); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + + // now add a child + watcher.latch = new CountDownLatch(1); + client.makePath("/parent/child1", bytes(""), true); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(watcher.children.size(), 1); + + // add another + watcher.latch = new CountDownLatch(1); + client.makePath("/parent/child2", bytes(""), true); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(watcher.children.size(), 2); + + // delete one + watcher.latch = new CountDownLatch(1); + client.delete("/parent/child1", 0, true); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(watcher.children.size(), 1); + + // force an update + watcher.latch = new CountDownLatch(1); + client.forceUpdateChildren("/parent"); + watcher.latch.await(TIMEOUT, TimeUnit.SECONDS); + + } + + // watches are re-set after session expiry + public void testSessionExpiry() throws Exception { + + client.makePath("/expirytest", bytes("test"), true); + client.makePath("/expirytest/child1", bytes("test"), true); + client.makePath("/expirytest/child2", bytes("test"), true); + + SyncDataWatch watch = new SyncDataWatch(); + client.addDataWatch("/expirytest", watch); + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + watch.latch = new CountDownLatch(1); + + CountDownLatch childWatchLatch = new CountDownLatch(2); + + client.addChildWatch("/expirytest", children -> { + childWatchLatch.countDown(); + return true; + }); + + // after expiry, both data watch and child watch should be called again + + zkServer.expire(client.getSolrZooKeeper().getSessionId()); + + watch.latch.await(TIMEOUT, TimeUnit.SECONDS); + childWatchLatch.await(TIMEOUT, TimeUnit.SECONDS); + + } + + // multiple watches share a Watcher + public void testSingleWatcherCreated() throws Exception { + + zkServer.getLimiter().setAction(ZkTestServer.LimitViolationAction.FAIL); + try { + SyncDataWatch watch1 = new SyncDataWatch(); + CountDownLatch latch = new CountDownLatch(1); + DataWatch watch2 = (version, data) -> { + latch.countDown(); + return false; + }; + + client.addDataWatch("/multiplewatchers", watch1); + client.addDataWatch("/multiplewatchers", watch2); + + client.create("/multiplewatchers", bytes("test"), CreateMode.PERSISTENT, true); + + // Both watchers should be notified, but the limiter should ensure + // that only a single ZK watcher was created + watch1.latch.await(TIMEOUT, TimeUnit.SECONDS); + latch.await(TIMEOUT, TimeUnit.SECONDS); + + } + finally { + zkServer.getLimiter().setAction(ZkTestServer.LimitViolationAction.IGNORE); + } + + } + + // watches get removed + public void testWatchesAreRemoved() throws Exception { + CountDownLatch latch = new CountDownLatch(6); + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + DataWatch preservedWatch = (version, data) -> { + count1.incrementAndGet(); + latch.countDown(); + return true; + }; + DataWatch singleFireWatch = (version, data) -> { + count2.incrementAndGet(); + latch.countDown(); + return version == -1; + }; + + // watches are fired immediately with the current values + client.addDataWatch("/removals", preservedWatch); + client.addDataWatch("/removals", singleFireWatch); + + // first data watch will fire both watches, but remove the second + client.create("/removals", bytes("data"), CreateMode.PERSISTENT, true); + // second should only fire one watch + client.setData("/removals", bytes("update"), true); + // third will again only fire one watch, and ensures that all notifications + // from the second change have completed + client.setData("/removals", bytes("and another update"), true); + + latch.await(TIMEOUT, TimeUnit.SECONDS); + assertEquals(4, count1.intValue()); + assertEquals(2, count2.intValue()); + } + +}