From d316946e6bef914b21e5d33c3b85ab52e1f84342 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 3 May 2021 17:29:43 -0600 Subject: [PATCH] SOLR-11904: Mark ReplicationHandler's polling thread as a Solr server thread so the PKI Interceptor is activated to allow PULL replicas to replicate from security-enabled leaders (#110) --- solr/CHANGES.txt | 3 + .../solr/handler/ReplicationHandler.java | 1 + .../apache/solr/cloud/TestPullReplica.java | 292 +++++++++--------- .../solr/cloud/TestPullReplicaWithAuth.java | 162 ++++++++++ 4 files changed, 309 insertions(+), 149 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestPullReplicaWithAuth.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 148856356bf1..fd40a92a7ec2 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -373,6 +373,9 @@ Bug Fixes * SOLR-15383: Solr Zookeeper status page shows green even when some Zookeepers are not serving requests (janhoy) +* SOLR-11904: Mark ReplicationHandler's polling thread as a Solr server thread so the PKI Interceptor is activated to + allow PULL replicas to replicate from security-enabled leaders (Timothy Potter) + Other Changes --------------------- * SOLR-15118: Deprecate CollectionAdminRequest.getV2Request(). (Jason Gerlowski) diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index 414d3c758975..264ed23c810e 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -1208,6 +1208,7 @@ private void setupPolling(String intervalStr) { log.info("Poll disabled"); return; } + ExecutorUtil.setServerThreadFlag(true); // so PKI auth works try { log.debug("Polling for index modifications"); markScheduledExecutionStart(); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java index 5e06c28b5b91..0d209c0e849e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; @@ -39,8 +40,11 @@ import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; @@ -64,28 +68,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.carrotsearch.randomizedtesting.annotations.Repeat; - @Slow -@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") +@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028") public class TestPullReplica extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private String collectionName = null; private final static int REPLICATION_TIMEOUT_SECS = 30; - - private String suggestedCollectionName() { - return (getTestClass().getSimpleName().replace("Test", "") + "_" + getSaferTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT); - } + private String collectionName = null; @BeforeClass public static void setupCluster() throws Exception { - // cloudSolrClientMaxStaleRetries - System.setProperty("cloudSolrClientMaxStaleRetries", "1"); - System.setProperty("zkReaderGetLeaderRetryTimeoutMs", "1000"); - - configureCluster(2) // 2 + random().nextInt(3) + System.setProperty("cloudSolrClientMaxStaleRetries", "1"); + System.setProperty("zkReaderGetLeaderRetryTimeoutMs", "1000"); + configureCluster(2) // 2 + random().nextInt(3) .addConfig("conf", configset("cloud-minimal")) .configure(); } @@ -97,6 +92,90 @@ public static void tearDownCluster() { TestInjection.reset(); } + static void waitForDeletion(String collection) throws InterruptedException, KeeperException { + TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) { + log.info("Collection not yet deleted"); + try { + Thread.sleep(100); + if (t.hasTimedOut()) { + fail("Timed out waiting for collection " + collection + " to be deleted."); + } + cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection); + } catch (SolrException e) { + return; + } + + } + } + + /** + * Asserts that Update logs don't exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL} + */ + static void assertUlogPresence(DocCollection collection) { + for (Slice s : collection.getSlices()) { + for (Replica r : s.getReplicas()) { + if (r.getType() == Replica.Type.NRT) { + continue; + } + SolrCore core = null; + try { + core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName()); + assertNotNull(core); + assertFalse("Update log should not exist for replicas of type Passive but file is present: " + core.getUlogDir(), + new java.io.File(core.getUlogDir()).exists()); + } finally { + core.close(); + } + } + } + } + + static DocCollection assertNumberOfReplicas(String coll, int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException { + if (updateCollection) { + cluster.getSolrClient().getZkStateReader().forceUpdateCollection(coll); + } + DocCollection docCollection = getCollectionState(coll); + assertNotNull(docCollection); + assertEquals("Unexpected number of writer replicas: " + docCollection, numNrtReplicas, + docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r -> !activeOnly || r.getState() == Replica.State.ACTIVE).count()); + assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, + docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r -> !activeOnly || r.getState() == Replica.State.ACTIVE).count()); + assertEquals("Unexpected number of active replicas: " + docCollection, numTlogReplicas, + docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r -> !activeOnly || r.getState() == Replica.State.ACTIVE).count()); + return docCollection; + } + + static void waitForNumDocsInAllReplicas(int numDocs, Collection replicas, String query, String user, String pass) throws IOException, SolrServerException, InterruptedException { + TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME); + for (Replica r : replicas) { + try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) { + while (true) { + try { + QueryRequest req = new QueryRequest(new SolrQuery(query)); + if (user != null) { + req.setBasicAuthCredentials(user, pass); + } + long numFound = req.process(replicaClient).getResults().getNumFound(); + assertEquals("Replica " + r.getName() + " not up to date after " + REPLICATION_TIMEOUT_SECS + + " seconds; replica only has " + numFound + " docs, expected " + numDocs, numDocs, numFound); + break; + } catch (AssertionError e) { + if (t.hasTimedOut()) { + throw e; + } else { + Thread.sleep(100); + } + } + } + } + } + } + + private String suggestedCollectionName() { + return (getTestClass().getSimpleName().replace("Test", "") + "_" + getSaferTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT); + } + @Override public void setUp() throws Exception { super.setUp(); @@ -107,7 +186,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { - for (JettySolrRunner jetty:cluster.getJettySolrRunners()) { + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { if (!jetty.isRunning()) { log.warn("Jetty {} not running, probably some bad test. Starting it", jetty.getLocalPort()); jetty.start(); @@ -122,15 +201,15 @@ public void tearDown() throws Exception { super.tearDown(); } - @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection - // commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018 + @Repeat(iterations = 2) // 2 times to make sure cleanup is complete and we can create the same collection public void testCreateDelete() throws Exception { + final int rand = 0; // random().nextInt(3); try { - switch (random().nextInt(3)) { + switch (rand) { case 0: // Sometimes use SolrJ CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 3) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); break; case 1: // Sometimes use v1 API @@ -168,7 +247,7 @@ public void testCreateDelete() throws Exception { 6, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); assertEquals("Expecting 2 writer replicas, one per shard", 2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); - for (Slice s:docCollection.getSlices()) { + for (Slice s : docCollection.getSlices()) { // read-only replicas can never become leaders assertFalse(s.getLeader().getType() == Replica.Type.PULL); List shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true); @@ -181,7 +260,7 @@ public void testCreateDelete() throws Exception { } else { // reload CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); assertEquals(0, response.getStatus()); reloaded = true; } @@ -191,91 +270,56 @@ public void testCreateDelete() throws Exception { } } - /** - * Asserts that Update logs don't exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL} - */ - private void assertUlogPresence(DocCollection collection) { - for (Slice s:collection.getSlices()) { - for (Replica r:s.getReplicas()) { - if (r.getType() == Replica.Type.NRT) { - continue; - } - SolrCore core = null; - try { - core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName()); - assertNotNull(core); - assertFalse("Update log should not exist for replicas of type Passive but file is present: " + core.getUlogDir(), - new java.io.File(core.getUlogDir()).exists()); - } finally { - core.close(); - } - } - } - } - @SuppressWarnings("unchecked") - // 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") public void testAddDocs() throws Exception { - int numPullReplicas = 1 + random().nextInt(3); + int numPullReplicas = 1; // + random().nextInt(3); CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numPullReplicas) - .process(cluster.getSolrClient()); - waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", collectionName, clusterShape(1, numPullReplicas + 1)); + .process(cluster.getSolrClient()); + waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", + collectionName, clusterShape(1, numPullReplicas + 1)); DocCollection docCollection = assertNumberOfReplicas(1, 0, numPullReplicas, false, true); assertEquals(1, docCollection.getSlices().size()); - boolean reloaded = false; int numDocs = 0; - while (true) { + CloudSolrClient solrClient = cluster.getSolrClient(); + for (int i = 0; i < 5; i++) { numDocs++; - cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(numDocs), "foo", "bar")); - cluster.getSolrClient().commit(collectionName); + + UpdateRequest ureq = new UpdateRequest(); + ureq.add(new SolrInputDocument("id", String.valueOf(numDocs), "foo", "bar")); + ureq.process(solrClient, collectionName); + ureq.commit(solrClient, collectionName); Slice s = docCollection.getSlices().iterator().next(); try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { assertEquals(numDocs, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); } - TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME); - for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.PULL))) { - //TODO: assert replication < REPLICATION_TIMEOUT_SECS + List pullReplicas = s.getReplicas(EnumSet.of(Replica.Type.PULL)); + waitForNumDocsInAllReplicas(numDocs, pullReplicas); + + for (Replica r : pullReplicas) { try (HttpSolrClient pullReplicaClient = getHttpSolrClient(r.getCoreUrl())) { - while (true) { - try { - assertEquals("Replica " + r.getName() + " not up to date after 10 seconds", - numDocs, pullReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound()); - break; - } catch (AssertionError e) { - if (t.hasTimedOut()) { - throw e; - } else { - Thread.sleep(100); - } - } - } - SolrQuery req = new SolrQuery( - "qt", "/admin/plugins", - "stats", "true"); + SolrQuery req = new SolrQuery("qt", "/admin/plugins", "stats", "true"); QueryResponse statsResponse = pullReplicaClient.query(req); + // adds is a gauge, which is null for PULL replicas + assertNull("Replicas shouldn't process the add document request: " + statsResponse, + ((Map) (statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds")); assertEquals("Replicas shouldn't process the add document request: " + statsResponse, - 0L, ((Map)(statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds")); + 0L, ((Map) (statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.cumulativeAdds.count")); } } - if (reloaded) { - break; - } else { - // reload - CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName) - .process(cluster.getSolrClient()); - assertEquals(0, response.getStatus()); - reloaded = true; - } } + + // reload + CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName).process(cluster.getSolrClient()); + assertEquals(0, response.getStatus()); assertUlogPresence(docCollection); } public void testAddRemovePullReplica() throws Exception { CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 0) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); waitForState("Expected collection to be created with 2 shards and 1 replica each", collectionName, clusterShape(2, 2)); DocCollection docCollection = assertNumberOfReplicas(2, 0, 0, false, true); assertEquals(2, docCollection.getSlices().size()); @@ -292,7 +336,7 @@ public void testAddRemovePullReplica() throws Exception { collectionName, "shard1", docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getName()) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); assertNumberOfReplicas(2, 0, 1, true, true); } @@ -301,7 +345,6 @@ public void testRemoveAllWriterReplicas() throws Exception { } @Test - //2018-06-18 (commented) @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018 public void testKillLeader() throws Exception { doTestNoLeader(false); } @@ -310,7 +353,7 @@ public void testKillLeader() throws Exception { public void testPullReplicaStates() throws Exception { // Validate that pull replicas go through the correct states when starting, stopping, reconnecting CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 0) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); // cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 0)); addDocs(500); @@ -341,22 +384,22 @@ public void testPullReplicaStates() throws Exception { public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException { // should be redirected to Replica.Type.NRT - int numReplicas = random().nextBoolean()?1:2; + int numReplicas = random().nextBoolean() ? 1 : 2; CollectionAdminRequest.createCollection(collectionName, "conf", 1, numReplicas, 0, numReplicas) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); waitForState("Unexpected replica count", collectionName, activeReplicaCount(numReplicas, 0, numReplicas)); DocCollection docCollection = assertNumberOfReplicas(numReplicas, 0, numReplicas, false, true); HttpClient httpClient = cluster.getSolrClient().getHttpClient(); int id = 0; Slice slice = docCollection.getSlice("shard1"); List ids = new ArrayList<>(slice.getReplicas().size()); - for (Replica rAdd:slice.getReplicas()) { + for (Replica rAdd : slice.getReplicas()) { try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) { client.add(new SolrInputDocument("id", String.valueOf(id), "foo_s", "bar")); } SolrDocument docCloudClient = cluster.getSolrClient().getById(collectionName, String.valueOf(id)); assertEquals("bar", docCloudClient.getFieldValue("foo_s")); - for (Replica rGet:slice.getReplicas()) { + for (Replica rGet : slice.getReplicas()) { try (HttpSolrClient client = getHttpSolrClient(rGet.getCoreUrl(), httpClient)) { SolrDocument doc = client.getById(String.valueOf(id)); assertEquals("bar", doc.getFieldValue("foo_s")); @@ -366,7 +409,7 @@ public void testRealTimeGet() throws SolrServerException, IOException, KeeperExc id++; } SolrDocumentList previousAllIdsResult = null; - for (Replica rAdd:slice.getReplicas()) { + for (Replica rAdd : slice.getReplicas()) { try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) { SolrDocumentList allIdsResult = client.getById(ids); if (previousAllIdsResult != null) { @@ -387,7 +430,7 @@ public void testRealTimeGet() throws SolrServerException, IOException, KeeperExc @SuppressWarnings({"try"}) private void doTestNoLeader(boolean removeReplica) throws Exception { CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2)); DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true); @@ -409,7 +452,7 @@ private void doTestNoLeader(boolean removeReplica) throws Exception { collectionName, "shard1", s.getLeader().getName()) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); } else { leaderJetty = cluster.getReplicaJetty(s.getLeader()); leaderJetty.stop(); @@ -434,10 +477,10 @@ private void doTestNoLeader(boolean removeReplica) throws Exception { } // add document, this should fail since there is no leader. Pull replica should not accept the update expectThrows(SolrException.class, () -> - cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) ); if (removeReplica) { - try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { assertEquals(highestTerm, zkShardTerms.getHighestTerm()); } } @@ -445,11 +488,11 @@ private void doTestNoLeader(boolean removeReplica) throws Exception { // Also fails if I send the update to the pull replica explicitly try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) { expectThrows(SolrException.class, () -> - cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")) ); } if (removeReplica) { - try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) { assertEquals(highestTerm, zkShardTerms.getHighestTerm()); } } @@ -486,13 +529,13 @@ private void doTestNoLeader(boolean removeReplica) throws Exception { leaderClient.commit(); assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); } - waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)), "id:2"); + waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)), "id:2", null, null); waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL))); } public void testKillPullReplica() throws Exception { CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1) - .process(cluster.getSolrClient()); + .process(cluster.getSolrClient()); // cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2)); DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true); @@ -528,60 +571,11 @@ private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, } private void waitForNumDocsInAllReplicas(int numDocs, Collection replicas) throws IOException, SolrServerException, InterruptedException { - waitForNumDocsInAllReplicas(numDocs, replicas, "*:*"); - } - - private void waitForNumDocsInAllReplicas(int numDocs, Collection replicas, String query) throws IOException, SolrServerException, InterruptedException { - TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME); - for (Replica r:replicas) { - try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) { - while (true) { - try { - assertEquals("Replica " + r.getName() + " not up to date after " + REPLICATION_TIMEOUT_SECS + " seconds", - numDocs, replicaClient.query(new SolrQuery(query)).getResults().getNumFound()); - break; - } catch (AssertionError e) { - if (t.hasTimedOut()) { - throw e; - } else { - Thread.sleep(100); - } - } - } - } - } - } - - private void waitForDeletion(String collection) throws InterruptedException, KeeperException { - TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); - while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) { - log.info("Collection not yet deleted"); - try { - Thread.sleep(100); - if (t.hasTimedOut()) { - fail("Timed out waiting for collection " + collection + " to be deleted."); - } - cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection); - } catch(SolrException e) { - return; - } - - } + waitForNumDocsInAllReplicas(numDocs, replicas, "*:*", null, null); } private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException { - if (updateCollection) { - cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName); - } - DocCollection docCollection = getCollectionState(collectionName); - assertNotNull(docCollection); - assertEquals("Unexpected number of writer replicas: " + docCollection, numNrtReplicas, - docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); - assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, - docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); - assertEquals("Unexpected number of active replicas: " + docCollection, numTlogReplicas, - docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); - return docCollection; + return assertNumberOfReplicas(collectionName, numNrtReplicas, numTlogReplicas, numPullReplicas, updateCollection, activeOnly); } /* @@ -589,7 +583,7 @@ private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogRepl */ private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() { return (liveNodes, collectionState) -> { - for (Replica r:collectionState.getReplicas()) { + for (Replica r : collectionState.getReplicas()) { if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) { return false; } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaWithAuth.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaWithAuth.java new file mode 100644 index 000000000000..7e4e9e4924f5 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaWithAuth.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.Utils; +import org.apache.solr.security.BasicAuthPlugin; +import org.apache.solr.security.RuleBasedAuthorizationPlugin; +import org.apache.solr.util.LogLevel; +import org.apache.solr.util.TestInjection; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.solr.cloud.TestPullReplica.assertNumberOfReplicas; +import static org.apache.solr.cloud.TestPullReplica.assertUlogPresence; +import static org.apache.solr.cloud.TestPullReplica.waitForDeletion; +import static org.apache.solr.cloud.TestPullReplica.waitForNumDocsInAllReplicas; +import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue; + +@Slow +@LogLevel("org.apache.solr.handler.ReplicationHandler=DEBUG,org.apache.solr.handler.IndexFetcher=DEBUG") +public class TestPullReplicaWithAuth extends SolrCloudTestCase { + + private static final String USER = "solr"; + private static final String PASS = "SolrRocksAgain"; + private static final String collectionName = "testPullReplicaWithAuth"; + + @BeforeClass + public static void setupCluster() throws Exception { + final String SECURITY_JSON = Utils.toJSONString + (Utils.makeMap("authorization", + Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(), + "user-role", singletonMap(USER, "admin"), + "permissions", singletonList(Utils.makeMap("name", "all", "role", "admin"))), + "authentication", + Utils.makeMap("class", BasicAuthPlugin.class.getName(), + "blockUnknown", true, + "credentials", singletonMap(USER, getSaltedHashedValue(PASS))))); + + configureCluster(3) + .addConfig("conf", configset("cloud-minimal")) + .withSecurityJson(SECURITY_JSON) + .configure(); + } + + @AfterClass + public static void tearDownCluster() { + TestInjection.reset(); + } + + @SuppressWarnings({"rawtypes"}) + private static T withBasicAuth(T req) { + req.setBasicAuthCredentials(USER, PASS); + return req; + } + + private QueryResponse queryWithBasicAuth(HttpSolrClient client, SolrQuery q) throws IOException, SolrServerException { + return withBasicAuth(new QueryRequest(q)).process(client); + } + + @SuppressWarnings("unchecked") + public void testPKIAuthWorksForPullReplication() throws Exception { + int numPullReplicas = 2; + withBasicAuth(CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numPullReplicas)) + .process(cluster.getSolrClient()); + waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", + collectionName, clusterShape(1, numPullReplicas + 1)); + DocCollection docCollection = + assertNumberOfReplicas(collectionName, 1, 0, numPullReplicas, false, true); + + int numDocs = 0; + CloudSolrClient solrClient = cluster.getSolrClient(); + for (int i = 0; i < 5; i++) { + numDocs++; + + UpdateRequest ureq = withBasicAuth(new UpdateRequest()); + ureq.add(new SolrInputDocument("id", String.valueOf(numDocs), "foo", "bar")); + ureq.process(solrClient, collectionName); + ureq.commit(solrClient, collectionName); + + Slice s = docCollection.getSlices().iterator().next(); + try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { + assertEquals(numDocs, queryWithBasicAuth(leaderClient, new SolrQuery("*:*")).getResults().getNumFound()); + } + + List pullReplicas = s.getReplicas(EnumSet.of(Replica.Type.PULL)); + waitForNumDocsInAllReplicas(numDocs, pullReplicas, "*:*", USER, PASS); + + for (Replica r : pullReplicas) { + try (HttpSolrClient pullReplicaClient = getHttpSolrClient(r.getCoreUrl())) { + QueryResponse statsResponse = queryWithBasicAuth(pullReplicaClient, new SolrQuery("qt", "/admin/plugins", "stats", "true")); + // adds is a gauge, which is null for PULL replicas + assertNull("Replicas shouldn't process the add document request: " + statsResponse, + ((Map) (statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds")); + assertEquals("Replicas shouldn't process the add document request: " + statsResponse, + 0L, ((Map) (statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.cumulativeAdds.count")); + } + } + } + + CollectionAdminResponse response = + withBasicAuth(CollectionAdminRequest.reloadCollection(collectionName)).process(cluster.getSolrClient()); + assertEquals(0, response.getStatus()); + assertUlogPresence(docCollection); + + // add another pull replica to ensure it can pull the indexes + Slice s = docCollection.getSlices().iterator().next(); + List pullReplicas = s.getReplicas(EnumSet.of(Replica.Type.PULL)); + assertEquals(2, pullReplicas.size()); + response = withBasicAuth(CollectionAdminRequest.addReplicaToShard(collectionName, s.getName(), Replica.Type.PULL)).process(cluster.getSolrClient()); + assertEquals(0, response.getStatus()); + + numPullReplicas = numPullReplicas + 1; // added a PULL + waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", + collectionName, clusterShape(1, numPullReplicas + 1)); + + docCollection = + assertNumberOfReplicas(collectionName, 1, 0, numPullReplicas, false, true); + s = docCollection.getSlices().iterator().next(); + pullReplicas = s.getReplicas(EnumSet.of(Replica.Type.PULL)); + assertEquals(numPullReplicas, pullReplicas.size()); + waitForNumDocsInAllReplicas(numDocs, pullReplicas, "*:*", USER, PASS); + + withBasicAuth(CollectionAdminRequest.deleteCollection(collectionName)).process(cluster.getSolrClient()); + waitForDeletion(collectionName); + } +}