From ec378fdbc0e06bdf506332ef197c3964b5079123 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Hartman?= Date: Fri, 19 Jun 2020 15:58:23 +0200 Subject: [PATCH] Remove Elasticsearch modules after move to the main repository (#81) --- README.md | 5 - elasticsearch/README.md | 13 +- elasticsearch/elasticsearch-5/README.md | 90 ---------- elasticsearch/elasticsearch-5/build.gradle | 5 - .../elasticsearch-5/gradle.properties | 2 - .../elasticsearch/ElasticsearchSinks.java | 146 --------------- .../elasticsearch/ElasticsearchSources.java | 158 ----------------- .../contrib/elasticsearch/package-info.java | 20 --- .../elasticsearch/ElasticsearchBaseTest.java | 128 -------------- .../elasticsearch/ElasticsearchSinkTest.java | 38 ---- .../ElasticsearchSourceTest.java | 72 -------- .../src/test/resources/log4j.properties | 10 -- elasticsearch/elasticsearch-6/README.md | 90 ---------- elasticsearch/elasticsearch-6/build.gradle | 5 - .../elasticsearch-6/gradle.properties | 2 - .../elasticsearch/ElasticsearchSinks.java | 154 ---------------- .../elasticsearch/ElasticsearchSources.java | 162 ----------------- .../contrib/elasticsearch/package-info.java | 20 --- .../elasticsearch/ElasticsearchBaseTest.java | 107 ----------- .../elasticsearch/ElasticsearchSinkTest.java | 39 ---- .../ElasticsearchSourceTest.java | 68 ------- .../src/test/resources/log4j.properties | 10 -- elasticsearch/elasticsearch-7/README.md | 92 ---------- elasticsearch/elasticsearch-7/build.gradle | 6 - .../elasticsearch-7/gradle.properties | 2 - .../elasticsearch/ElasticsearchSinks.java | 162 ----------------- .../elasticsearch/ElasticsearchSources.java | 166 ------------------ .../contrib/elasticsearch/package-info.java | 20 --- .../elasticsearch/ElasticsearchBaseTest.java | 121 ------------- .../elasticsearch/ElasticsearchSinkTest.java | 41 ----- .../ElasticsearchSourceTest.java | 70 -------- .../src/test/resources/log4j.properties | 10 -- settings.gradle | 7 - 33 files changed, 3 insertions(+), 2038 deletions(-) delete mode 100644 elasticsearch/elasticsearch-5/README.md delete mode 100644 elasticsearch/elasticsearch-5/build.gradle delete mode 100644 elasticsearch/elasticsearch-5/gradle.properties delete mode 100644 elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java delete mode 100644 elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java delete mode 100644 elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java delete mode 100644 elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java delete mode 100644 elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java delete mode 100644 elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java delete mode 100644 elasticsearch/elasticsearch-5/src/test/resources/log4j.properties delete mode 100644 elasticsearch/elasticsearch-6/README.md delete mode 100644 elasticsearch/elasticsearch-6/build.gradle delete mode 100644 elasticsearch/elasticsearch-6/gradle.properties delete mode 100644 elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java delete mode 100644 elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java delete mode 100644 elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java delete mode 100644 elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java delete mode 100644 elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java delete mode 100644 elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java delete mode 100644 elasticsearch/elasticsearch-6/src/test/resources/log4j.properties delete mode 100644 elasticsearch/elasticsearch-7/README.md delete mode 100644 elasticsearch/elasticsearch-7/build.gradle delete mode 100644 elasticsearch/elasticsearch-7/gradle.properties delete mode 100644 elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java delete mode 100644 elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java delete mode 100644 elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java delete mode 100644 elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java delete mode 100644 elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java delete mode 100644 elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java delete mode 100644 elasticsearch/elasticsearch-7/src/test/resources/log4j.properties diff --git a/README.md b/README.md index 34a4a2f..405f85a 100644 --- a/README.md +++ b/README.md @@ -32,11 +32,6 @@ pipelines to read/write data points from/to InfluxDb. A collection of probabilistic aggregations such as HyperLogLog. -### [Elasticsearch Connector](elasticsearch) - -A Hazelcast Jet connector for Elasticsearch for querying/indexing objects -from/to Elasticsearch. - ### [Redis Connectors](redis) Hazelcast Jet connectors for various Redis data structures. diff --git a/elasticsearch/README.md b/elasticsearch/README.md index 2b995ae..64cf6d9 100644 --- a/elasticsearch/README.md +++ b/elasticsearch/README.md @@ -1,12 +1,5 @@ # Elasticsearch Connectors -A Hazelcast Jet connector for Elasticsearch (v5..v7) for querying/indexing -objects from/to Elasticsearch. - -There are 3 major versions of Elasticsearch we support: - -- [Elasticsearch 5.6.x](elasticsearch-5) - -- [Elasticsearch 6.x.x](elasticsearch-6) - -- [Elasticsearch 7.x.x](elasticsearch-7) +Starting with Hazelcast Jet version 4.2 the Elasticsearch connectors +were moved to the [main repository](https://github.com/hazelcast/hazelcast-jet/tree/master/extensions/elasticsearch) +and are part of the standard distribution. \ No newline at end of file diff --git a/elasticsearch/elasticsearch-5/README.md b/elasticsearch/elasticsearch-5/README.md deleted file mode 100644 index 206c8cc..0000000 --- a/elasticsearch/elasticsearch-5/README.md +++ /dev/null @@ -1,90 +0,0 @@ -# Elasticsearch Connector - -A Hazelcast Jet connector for Elasticsearch (v5.6.x) for querying/indexing objects -from/to Elasticsearch. - -## Getting Started - -### Installing - -The Elasticsearch Connector artifacts are published in the Maven repositories. - -Add the following lines to your pom.xml to include it as a dependency to your project: - -``` - - com.hazelcast.jet.contrib - elasticsearch-5 - ${version} - -``` - -or if you are using Gradle: -``` -compile group: 'com.hazelcast.jet.contrib', name: 'elasticsearch-5', version: ${version} -``` - -### Usage - -#### As a Source - -Elasticsearch batch source (`ElasticsearchSources.elasticsearch()`) executes -the query and retrieves the results using `scrolling`. - -Following is an example pipeline which queries Elasticsearch and logs the -results: - -```java -Pipeline p = Pipeline.create(); - -p.readFrom(ElasticsearchSources.elasticsearch("sourceName", - () -> RestClient.builder(HttpHost.create("hostAddress")).build(), - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - }, - "10s", - SearchHit::getSourceAsString, - RestClient::close)) - .writeTo(Sinks.logger()); -``` - -#### As a Sink - -Elasticsearch sink (`ElasticsearchSinks.elasticsearch()`) is used to index objects from -Hazelcast Jet Pipeline to Elasticsearch. - -Here is a very simple pipeline which reads out some users from Hazelcast -List and indexes them to Elasticsearch: - -```java -Pipeline p = Pipeline.create(); -p.readFrom(Sources.list(users)) - .writeTo(ElasticsearchSinks.elasticsearch("sinkName", - () -> RestClient.builder(HttpHost.create("hostAddress")).build(), - BulkRequest::new, - user -> { - IndexRequest request = new IndexRequest(indexName, "doc", user.id); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }, - RestClient::close)); -``` - -### Running the tests - -To run the tests run the command below: - -``` -./gradlew test -``` - -## Authors - -* **[Ali Gurbuz](https://github.com/gurbuzali)** diff --git a/elasticsearch/elasticsearch-5/build.gradle b/elasticsearch/elasticsearch-5/build.gradle deleted file mode 100644 index 2859198..0000000 --- a/elasticsearch/elasticsearch-5/build.gradle +++ /dev/null @@ -1,5 +0,0 @@ -dependencies { - compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.0' - testCompile "org.testcontainers:elasticsearch:1.11.2" - testCompile "org.slf4j:slf4j-log4j12:1.7.26" -} \ No newline at end of file diff --git a/elasticsearch/elasticsearch-5/gradle.properties b/elasticsearch/elasticsearch-5/gradle.properties deleted file mode 100644 index 4a62185..0000000 --- a/elasticsearch/elasticsearch-5/gradle.properties +++ /dev/null @@ -1,2 +0,0 @@ -version = 0.3-SNAPSHOT -description = A Hazelcast Jet connector for Elasticsearch (v5.6.x) for querying/indexing objects from/to Elasticsearch. diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java deleted file mode 100644 index b1508bf..0000000 --- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.Sink; -import com.hazelcast.jet.pipeline.SinkBuilder; -import org.apache.http.HttpHost; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.IOException; - -import static org.apache.http.auth.AuthScope.ANY; - -/** - * Contains factory methods for Elasticsearch sinks. - */ -public final class ElasticsearchSinks { - - private ElasticsearchSinks() { - } - - /** - * Creates a sink which indexes objects using the specified Elasticsearch - * client and the specified bulk request. - * - * @param name name of the created sink - * @param clientSupplier Elasticsearch REST client supplier - * @param bulkRequestSupplier bulk request supplier, will be called to obtain a - * new {@link BulkRequest} instance after each call. - * @param requestFn creates an {@link IndexRequest}, {@link UpdateRequest} - * or {@link DeleteRequest} for each object - * @param destroyFn called upon completion to release any resource - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx bulkRequestSupplier, - @Nonnull FunctionEx requestFn, - @Nonnull ConsumerEx destroyFn - ) { - return SinkBuilder - .sinkBuilder(name, ctx -> new BulkContext(clientSupplier.get(), bulkRequestSupplier)) - .receiveFn((bulkContext, item) -> bulkContext.add(requestFn.apply(item))) - .flushFn(BulkContext::flush) - .destroyFn(b -> destroyFn.accept(b.client)) - .preferredLocalParallelism(2) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, - * FunctionEx, ConsumerEx)}. Creates a new {@link BulkRequest} with default - * options for each batch and closes the {@link RestClient} upon - * completion. Assumes that the {@code clientSupplier} creates a new client - * for each call. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, clientSupplier, BulkRequest::new, requestFn, RestClient::close); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, FunctionEx)} - * Rest client is configured with basic authentication. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull String username, - @Nullable String password, - @Nonnull String hostname, - int port, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, () -> buildClient(username, password, hostname, port), requestFn); - } - - static RestClient buildClient(String username, String password, String hostname, int port) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(username, password)); - - return RestClient.builder(new HttpHost(hostname, port)) - .setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build(); - } - - private static final class BulkContext { - - private final RestClient client; - private final RestHighLevelClient highLevelClient; - private final SupplierEx bulkRequestSupplier; - - private BulkRequest bulkRequest; - - private BulkContext(RestClient client, SupplierEx bulkRequestSupplier) { - this.client = client; - this.highLevelClient = new RestHighLevelClient(client); - this.bulkRequestSupplier = bulkRequestSupplier; - this.bulkRequest = bulkRequestSupplier.get(); - } - - private void add(DocWriteRequest request) { - bulkRequest.add(request); - } - - private void flush() throws IOException { - BulkResponse response = highLevelClient.bulk(bulkRequest); - if (response.hasFailures()) { - throw new ElasticsearchException(response.buildFailureMessage()); - } - bulkRequest = bulkRequestSupplier.get(); - } - } -} diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java deleted file mode 100644 index 8b92f5b..0000000 --- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.BatchSource; -import com.hazelcast.jet.pipeline.SourceBuilder; -import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.SearchHit; - -import javax.annotation.Nonnull; -import java.io.IOException; - -import static com.hazelcast.jet.contrib.elasticsearch.ElasticsearchSinks.buildClient; - -/** - * Contains factory methods for Elasticsearch sources. - */ -public final class ElasticsearchSources { - - private static final String DEFAULT_SCROLL_TIMEOUT = "60s"; - - private ElasticsearchSources() { - } - - /** - * Creates a source which queries objects using the specified Elasticsearch - * client and the specified request supplier using scrolling method. - * - * @param name Name of the source - * @param clientSupplier Elasticsearch rest client supplier - * @param searchRequestSupplier Search request supplier - * @param scrollTimeout scroll keep alive time - * @param mapHitFn maps search hits to output items - * @param destroyFn called upon completion to release any resource - * @param type of items emitted downstream - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier, - @Nonnull String scrollTimeout, - @Nonnull FunctionEx mapHitFn, - @Nonnull ConsumerEx destroyFn - ) { - return SourceBuilder - .batch(name, ctx -> { - RestClient client = clientSupplier.get(); - SearchRequest searchRequest = searchRequestSupplier.get(); - return new SearchContext<>(client, scrollTimeout, mapHitFn, searchRequest, destroyFn); - }) - .fillBufferFn(SearchContext::fillBuffer) - .destroyFn(SearchContext::close) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, String, FunctionEx, ConsumerEx)}. - * Uses {@link #DEFAULT_SCROLL_TIMEOUT} for scroll timeout, emits string - * representation of items using {@link SearchHit#getSourceAsString()} and - * closes the {@link RestClient} upon completion. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, clientSupplier, searchRequestSupplier, - DEFAULT_SCROLL_TIMEOUT, SearchHit::getSourceAsString, RestClient::close); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx)}. - * Rest client is configured with basic authentication. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull String username, String password, - @Nonnull String hostname, int port, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, () -> buildClient(username, password, hostname, port), searchRequestSupplier); - } - - private static final class SearchContext { - - private final RestClient client; - private final RestHighLevelClient highLevelClient; - private final String scrollInterval; - private final FunctionEx mapHitFn; - private final ConsumerEx destroyFn; - - private SearchResponse searchResponse; - - private SearchContext(RestClient client, String scrollInterval, FunctionEx mapHitFn, - SearchRequest searchRequest, ConsumerEx destroyFn) throws IOException { - this.client = client; - this.highLevelClient = new RestHighLevelClient(client); - this.scrollInterval = scrollInterval; - this.mapHitFn = mapHitFn; - this.destroyFn = destroyFn; - - searchRequest.scroll(scrollInterval); - searchResponse = highLevelClient.search(searchRequest); - } - - private void fillBuffer(SourceBuffer buffer) throws IOException { - SearchHit[] hits = searchResponse.getHits().getHits(); - if (hits == null || hits.length == 0) { - buffer.close(); - return; - } - for (SearchHit hit : hits) { - T item = mapHitFn.apply(hit); - if (item != null) { - buffer.add(item); - } - } - - SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()); - scrollRequest.scroll(scrollInterval); - searchResponse = highLevelClient.searchScroll(scrollRequest); - } - - private void clearScroll() throws IOException { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(searchResponse.getScrollId()); - highLevelClient.clearScroll(clearScrollRequest); - } - - private void close() throws IOException { - clearScroll(); - destroyFn.accept(client); - } - } -} diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java deleted file mode 100644 index bd022a6..0000000 --- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains sources and sinks for Elasticsearch. - */ -package com.hazelcast.jet.contrib.elasticsearch; diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java deleted file mode 100644 index 96aa706..0000000 --- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.core.JetTestSupport; -import org.apache.http.HttpHost; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.Network; -import org.testcontainers.elasticsearch.ElasticsearchContainer; - -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.http.auth.AuthScope.ANY; -import static org.junit.Assert.assertTrue; - -public abstract class ElasticsearchBaseTest extends JetTestSupport { - - static final String DEFAULT_USER = "elastic"; - static final String DEFAULT_PASS = "changeme"; - - private static final int OBJECT_COUNT = 20; - - @Rule - public ElasticsearchContainer container = - new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:5.6.16") - .withNetwork(Network.newNetwork()); - - JetInstance jet; - IList userList; - String indexName = "users"; - - private RestClient client; - private RestHighLevelClient highLevelClient; - - @Before - public void setupBase() { - container.start(); - - client = createClient(container.getContainerIpAddress(), mappedPort()); - highLevelClient = new RestHighLevelClient(client); - - jet = createJetMember(); - - userList = jet.getList("userList"); - for (int i = 0; i < OBJECT_COUNT; i++) { - userList.add(new User("user-" + i, i)); - } - } - - @After - public void cleanupBase() throws IOException { - container.stop(); - client.close(); - } - - int mappedPort() { - String hostAddress = container.getHttpHostAddress(); - return Integer.parseInt(hostAddress.split(":")[1]); - } - - void assertIndexes() throws IOException { - for (int i = 0; i < OBJECT_COUNT; i++) { - GetRequest request = new GetRequest(indexName).id(String.valueOf(i)); - assertTrue(highLevelClient.exists(request)); - } - } - - static RestClient createClient(String containerAddress, int port) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(DEFAULT_USER, DEFAULT_PASS)); - - return RestClient.builder(new HttpHost(containerAddress, port)) - .setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build(); - } - - static FunctionEx indexFn(String indexName) { - return user -> { - IndexRequest request = new IndexRequest(indexName, "doc", String.valueOf(user.age)); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }; - } - - static final class User implements Serializable { - - final String name; - final int age; - - User(String name, int age) { - this.name = name; - this.age = age; - } - - } -} diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java deleted file mode 100644 index d291658..0000000 --- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sources; -import org.junit.Test; - -import java.io.IOException; - -public class ElasticsearchSinkTest extends ElasticsearchBaseTest { - - @Test - public void test_elasticsearchSink() throws IOException { - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS, - container.getContainerIpAddress(), mappedPort(), indexFn(indexName))); - - jet.newJob(p).join(); - - assertIndexes(); - } -} diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java deleted file mode 100644 index 0020d0b..0000000 --- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.Sources; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.Test; - -import java.io.IOException; - -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.junit.Assert.assertEquals; - -public class ElasticsearchSourceTest extends ElasticsearchBaseTest { - - @Test - public void test() throws IOException { - String containerIpAddress = container.getContainerIpAddress(); - int port = mappedPort(); - - SupplierEx clientSupplier = () -> createClient(containerIpAddress, port); - - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier, - () -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestClient::close)); - - jet.newJob(p).join(); - - assertIndexes(); - - p = Pipeline.create(); - - p.readFrom(ElasticsearchSources.elasticsearch("users", clientSupplier, - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - })) - .writeTo(Sinks.list("sink")); - - jet.newJob(p).join(); - - IList sink = jet.getList("sink"); - assertEquals(1, sink.size()); - } - -} diff --git a/elasticsearch/elasticsearch-5/src/test/resources/log4j.properties b/elasticsearch/elasticsearch-5/src/test/resources/log4j.properties deleted file mode 100644 index b76bfeb..0000000 --- a/elasticsearch/elasticsearch-5/src/test/resources/log4j.properties +++ /dev/null @@ -1,10 +0,0 @@ -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p |%X{test-name}| - [%c{1}] %t - %m%n - -log4j.logger.com.hazelcast.jet=debug -log4j.logger.com.hazelcast.internal.cluster=info -log4j.logger.com.hazelcast=info - -log4j.rootLogger=info, stdout \ No newline at end of file diff --git a/elasticsearch/elasticsearch-6/README.md b/elasticsearch/elasticsearch-6/README.md deleted file mode 100644 index 29a09fe..0000000 --- a/elasticsearch/elasticsearch-6/README.md +++ /dev/null @@ -1,90 +0,0 @@ -# Elasticsearch Connector - -A Hazelcast Jet connector for Elasticsearch (v6.x.x) for querying/indexing objects -from/to Elasticsearch. - -## Getting Started - -### Installing - -The Elasticsearch Connector artifacts are published in the Maven repositories. - -Add the following lines to your pom.xml to include it as a dependency to your project: - -``` - - com.hazelcast.jet.contrib - elasticsearch-6 - ${version} - -``` - -or if you are using Gradle: -``` -compile group: 'com.hazelcast.jet.contrib', name: 'elasticsearch-6', version: ${version} -``` - -### Usage - -#### As a Source - -Elasticsearch batch source (`ElasticsearchSources.elasticsearch()`) executes -the query and retrieves the results using `scrolling`. - -Following is an example pipeline which queries Elasticsearch and logs the -results: - -```java -Pipeline p = Pipeline.create(); - -p.readFrom(ElasticsearchSources.elasticsearch("sourceName", - () -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))), - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - }, - "10s", - SearchHit::getSourceAsString, - RestHighLevelClient::close)) - .writeTo(Sinks.logger()); -``` - -#### As a Sink - -Elasticsearch sink (`ElasticsearchSinks.elasticsearch()`) is used to index objects from -Hazelcast Jet Pipeline to Elasticsearch. - -Here is a very simple pipeline which reads out some users from Hazelcast -List and indexes them to Elasticsearch. - -```java -Pipeline p = Pipeline.create(); -p.readFrom(Sources.list(users)) - .writeTo(ElasticsearchSinks.elasticsearch("sinkName", - () -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))), - BulkRequest::new, - user -> { - IndexRequest request = new IndexRequest(indexName, "doc", user.id); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }, - RestHighLevelClient::close)); -``` - -### Running the tests - -To run the tests run the command below: - -``` -./gradlew test -``` - -## Authors - -* **[Ali Gurbuz](https://github.com/gurbuzali)** diff --git a/elasticsearch/elasticsearch-6/build.gradle b/elasticsearch/elasticsearch-6/build.gradle deleted file mode 100644 index 1e12d64..0000000 --- a/elasticsearch/elasticsearch-6/build.gradle +++ /dev/null @@ -1,5 +0,0 @@ -dependencies { - compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:6.0.0' - testCompile "org.testcontainers:elasticsearch:1.11.2" - testCompile "org.slf4j:slf4j-log4j12:1.7.26" -} \ No newline at end of file diff --git a/elasticsearch/elasticsearch-6/gradle.properties b/elasticsearch/elasticsearch-6/gradle.properties deleted file mode 100644 index 46b83e0..0000000 --- a/elasticsearch/elasticsearch-6/gradle.properties +++ /dev/null @@ -1,2 +0,0 @@ -version = 0.3-SNAPSHOT -description = A Hazelcast Jet connector for Elasticsearch (v6.x.x) for querying/indexing objects from/to Elasticsearch. diff --git a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java b/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java deleted file mode 100644 index 394b48c..0000000 --- a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.Sink; -import com.hazelcast.jet.pipeline.SinkBuilder; -import org.apache.http.HttpHost; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.IOException; - -import static org.apache.http.auth.AuthScope.ANY; - -/** - * Contains factory methods for Elasticsearch sinks. - */ -public final class ElasticsearchSinks { - - private ElasticsearchSinks() { - } - - /** - * Creates a sink which indexes objects using the specified Elasticsearch - * client and the specified bulk request. - * - * @param name name of the created sink - * @param clientSupplier Elasticsearch REST client supplier - * @param bulkRequestSupplier bulk request supplier, will be called to obtain a - * new {@link BulkRequest} instance after each call. - * @param requestFn creates an {@link IndexRequest}, {@link UpdateRequest} - * or {@link DeleteRequest} for each object - * @param destroyFn called upon completion to release any resource - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx bulkRequestSupplier, - @Nonnull FunctionEx requestFn, - @Nonnull ConsumerEx destroyFn - ) { - return SinkBuilder - .sinkBuilder(name, ctx -> new BulkContext(clientSupplier.get(), bulkRequestSupplier, destroyFn)) - .receiveFn((bulkContext, item) -> bulkContext.add(requestFn.apply(item))) - .flushFn(BulkContext::flush) - .destroyFn(BulkContext::close) - .preferredLocalParallelism(2) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, - * FunctionEx, ConsumerEx)}. Creates a new {@link BulkRequest} with default - * options for each batch and closes the {@link RestHighLevelClient} upon - * completion. Assumes that the {@code clientSupplier} creates a new client - * for each call. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, clientSupplier, BulkRequest::new, requestFn, RestHighLevelClient::close); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, FunctionEx)} - * Rest client is configured with basic authentication. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull String username, - @Nullable String password, - @Nonnull String hostname, - int port, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, () -> buildClient(username, password, hostname, port), requestFn); - } - - static RestHighLevelClient buildClient(String username, String password, String hostname, int port) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(username, password)); - return new RestHighLevelClient( - RestClient.builder(new HttpHost(hostname, port)) - .setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)) - ); - } - - private static final class BulkContext { - - private final RestHighLevelClient client; - private final SupplierEx bulkRequestSupplier; - private final ConsumerEx destroyFn; - - private BulkRequest bulkRequest; - - private BulkContext(RestHighLevelClient client, SupplierEx bulkRequestSupplier, - ConsumerEx destroyFn) { - this.client = client; - this.bulkRequestSupplier = bulkRequestSupplier; - this.destroyFn = destroyFn; - - this.bulkRequest = bulkRequestSupplier.get(); - } - - private void add(DocWriteRequest request) { - bulkRequest.add(request); - } - - private void flush() throws IOException { - BulkResponse response = client.bulk(bulkRequest); - if (response.hasFailures()) { - throw new ElasticsearchException(response.buildFailureMessage()); - } - bulkRequest = bulkRequestSupplier.get(); - } - - private void close() { - destroyFn.accept(client); - } - } - -} diff --git a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java b/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java deleted file mode 100644 index 2a420c1..0000000 --- a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.BatchSource; -import com.hazelcast.jet.pipeline.SourceBuilder; -import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.SearchHit; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.IOException; - -import static com.hazelcast.jet.contrib.elasticsearch.ElasticsearchSinks.buildClient; - -/** - * Contains factory methods for Elasticsearch sources. - */ -public final class ElasticsearchSources { - - private static final String DEFAULT_SCROLL_TIMEOUT = "60s"; - - private ElasticsearchSources() { - } - - /** - * Creates a source which queries objects using the specified Elasticsearch - * client and the specified request supplier using scrolling method. - * - * @param name name of the created source - * @param clientSupplier Elasticsearch REST client supplier - * @param searchRequestSupplier search request supplier - * @param scrollTimeout scroll keep alive time - * @param mapHitFn maps search hits to output items - * @param destroyFn called upon completion to release any resource - * @param type of items emitted downstream - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier, - @Nullable String scrollTimeout, - @Nonnull FunctionEx mapHitFn, - @Nonnull ConsumerEx destroyFn - ) { - return SourceBuilder - .batch(name, ctx -> new SearchContext<>(clientSupplier.get(), scrollTimeout, - mapHitFn, searchRequestSupplier.get(), destroyFn)) - .fillBufferFn(SearchContext::fillBuffer) - .destroyFn(SearchContext::close) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, - * String, FunctionEx, ConsumerEx)}. - * Uses {@link #DEFAULT_SCROLL_TIMEOUT} for scroll timeout, emits string - * representation of items using {@link SearchHit#getSourceAsString()} and - * closes the {@link RestHighLevelClient} upon completion. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, clientSupplier, searchRequestSupplier, - DEFAULT_SCROLL_TIMEOUT, SearchHit::getSourceAsString, RestHighLevelClient::close); - } - - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx)}. - * Rest client is configured with basic authentication. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull String username, - @Nullable String password, - @Nonnull String hostname, - int port, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, () -> buildClient(username, password, hostname, port), searchRequestSupplier); - } - - private static final class SearchContext { - - private final RestHighLevelClient client; - private final String scrollInterval; - private final FunctionEx mapHitFn; - private final ConsumerEx destroyFn; - - private SearchResponse searchResponse; - - private SearchContext( - RestHighLevelClient client, - String scrollInterval, - FunctionEx mapHitFn, - SearchRequest searchRequest, - ConsumerEx destroyFn - ) throws IOException { - this.client = client; - this.scrollInterval = scrollInterval; - this.mapHitFn = mapHitFn; - this.destroyFn = destroyFn; - - searchRequest.scroll(scrollInterval); - searchResponse = client.search(searchRequest); - } - - private void fillBuffer(SourceBuffer buffer) throws IOException { - SearchHit[] hits = searchResponse.getHits().getHits(); - if (hits == null || hits.length == 0) { - buffer.close(); - return; - } - for (SearchHit hit : hits) { - T item = mapHitFn.apply(hit); - if (item != null) { - buffer.add(item); - } - } - - SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()); - scrollRequest.scroll(scrollInterval); - searchResponse = client.searchScroll(scrollRequest); - } - - private void clearScroll() throws IOException { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(searchResponse.getScrollId()); - client.clearScroll(clearScrollRequest); - } - - private void close() throws IOException { - clearScroll(); - destroyFn.accept(client); - } - } -} diff --git a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java b/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java deleted file mode 100644 index bd022a6..0000000 --- a/elasticsearch/elasticsearch-6/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains sources and sinks for Elasticsearch. - */ -package com.hazelcast.jet.contrib.elasticsearch; diff --git a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java b/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java deleted file mode 100644 index d38109f..0000000 --- a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.core.JetTestSupport; -import org.apache.http.HttpHost; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.Network; -import org.testcontainers.elasticsearch.ElasticsearchContainer; - -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertTrue; - -public abstract class ElasticsearchBaseTest extends JetTestSupport { - - private static final int OBJECT_COUNT = 20; - - @Rule - public ElasticsearchContainer container = - new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.0") - .withNetwork(Network.newNetwork()); - - JetInstance jet; - IList userList; - String indexName = "users"; - private RestHighLevelClient client; - - @Before - public void setup() { - container.start(); - client = createClient(container.getHttpHostAddress()); - - jet = createJetMember(); - - userList = jet.getList("userList"); - for (int i = 0; i < OBJECT_COUNT; i++) { - userList.add(new User("user-" + i, i)); - } - } - - @After - public void cleanup() throws IOException { - container.stop(); - client.close(); - } - - void assertIndexes() throws IOException { - for (int i = 0; i < OBJECT_COUNT; i++) { - GetRequest request = new GetRequest(indexName).id(String.valueOf(i)); - assertTrue(client.exists(request)); - } - } - - static RestHighLevelClient createClient(String containerAddress) { - return new RestHighLevelClient(RestClient.builder(HttpHost.create(containerAddress))); - } - - static FunctionEx indexFn(String indexName) { - return user -> { - IndexRequest request = new IndexRequest(indexName, "doc", String.valueOf(user.age)); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }; - } - - static final class User implements Serializable { - - final String name; - final int age; - - User(String name, int age) { - this.name = name; - this.age = age; - } - - } -} diff --git a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java b/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java deleted file mode 100644 index a899110..0000000 --- a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sources; -import org.junit.Test; - -import java.io.IOException; - -public class ElasticsearchSinkTest extends ElasticsearchBaseTest { - - @Test - public void test_elasticsearchSink() throws IOException { - String containerAddress = container.getHttpHostAddress(); - - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName))); - - jet.newJob(p).join(); - - assertIndexes(); - } -} diff --git a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java b/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java deleted file mode 100644 index cc537c0..0000000 --- a/elasticsearch/elasticsearch-6/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.Sources; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.Test; - -import java.io.IOException; - -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.junit.Assert.assertEquals; - -public class ElasticsearchSourceTest extends ElasticsearchBaseTest { - - @Test - public void test() throws IOException { - String containerAddress = container.getHttpHostAddress(); - - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), - () -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestHighLevelClient::close)); - - jet.newJob(p).join(); - - assertIndexes(); - - p = Pipeline.create(); - - p.readFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress), - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - })) - .writeTo(Sinks.list("sink")); - - jet.newJob(p).join(); - - IList sink = jet.getList("sink"); - assertEquals(1, sink.size()); - } - -} diff --git a/elasticsearch/elasticsearch-6/src/test/resources/log4j.properties b/elasticsearch/elasticsearch-6/src/test/resources/log4j.properties deleted file mode 100644 index b76bfeb..0000000 --- a/elasticsearch/elasticsearch-6/src/test/resources/log4j.properties +++ /dev/null @@ -1,10 +0,0 @@ -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p |%X{test-name}| - [%c{1}] %t - %m%n - -log4j.logger.com.hazelcast.jet=debug -log4j.logger.com.hazelcast.internal.cluster=info -log4j.logger.com.hazelcast=info - -log4j.rootLogger=info, stdout \ No newline at end of file diff --git a/elasticsearch/elasticsearch-7/README.md b/elasticsearch/elasticsearch-7/README.md deleted file mode 100644 index b099de9..0000000 --- a/elasticsearch/elasticsearch-7/README.md +++ /dev/null @@ -1,92 +0,0 @@ -# Elasticsearch Connector - -A Hazelcast Jet connector for Elasticsearch (v7.x.x) for querying/indexing objects -from/to Elasticsearch. - -## Getting Started - -### Installing - -The Elasticsearch Connector artifacts are published in the Maven repositories. - -Add the following lines to your pom.xml to include it as a dependency to your project: - -``` - - com.hazelcast.jet.contrib - elasticsearch-7 - ${version} - -``` - -or if you are using Gradle: -``` -compile group: 'com.hazelcast.jet.contrib', name: 'elasticsearch-7', version: ${version} -``` - -### Usage - -#### As a Source - -Elasticsearch batch source (`ElasticsearchSources.elasticsearch()`) executes -the query and retrieves the results using `scrolling`. - -Following is an example pipeline which queries Elasticsearch and logs the -results: - -```java -Pipeline p = Pipeline.create(); - -p.readFrom(ElasticsearchSources.elasticsearch("sourceName", - () -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))), - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - }, - "10s", - SearchHit::getSourceAsString, - request -> RequestOptions.DEFAULT, - RestHighLevelClient::close)) - .writeTo(Sinks.logger()); -``` - -#### As a Sink - -Elasticsearch sink (`Elasticsearch.elasticsearch()`) is used to index objects from -Hazelcast Jet Pipeline to Elasticsearch. - -Here is a very simple pipeline which reads out some users from Hazelcast -List and indexes them to Elasticsearch. - -```java -Pipeline p = Pipeline.create(); -p.readFrom(Sources.list(users)) - .writeTo(ElasticsearchSinks.elasticsearch("sinkName", - () -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))), - BulkRequest::new, - user -> { - IndexRequest request = new IndexRequest(indexName, "doc", user.id); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }, - request -> RequestOptions.DEFAULT, - RestHighLevelClient::close)); -``` - -### Running the tests - -To run the tests run the command below: - -``` -./gradlew test -``` - -## Authors - -* **[Ali Gurbuz](https://github.com/gurbuzali)** diff --git a/elasticsearch/elasticsearch-7/build.gradle b/elasticsearch/elasticsearch-7/build.gradle deleted file mode 100644 index 1fbde41..0000000 --- a/elasticsearch/elasticsearch-7/build.gradle +++ /dev/null @@ -1,6 +0,0 @@ -dependencies { - compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.0.0' - testCompile "org.testcontainers:elasticsearch:1.11.2" - testCompile "org.slf4j:slf4j-log4j12:1.7.26" -} - diff --git a/elasticsearch/elasticsearch-7/gradle.properties b/elasticsearch/elasticsearch-7/gradle.properties deleted file mode 100644 index 5b740d9..0000000 --- a/elasticsearch/elasticsearch-7/gradle.properties +++ /dev/null @@ -1,2 +0,0 @@ -version = 0.3-SNAPSHOT -description = A Hazelcast Jet connector for Elasticsearch (v7.x.x) for querying/indexing objects from/to Elasticsearch. diff --git a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java b/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java deleted file mode 100644 index 7e889b7..0000000 --- a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.Sink; -import com.hazelcast.jet.pipeline.SinkBuilder; -import org.apache.http.HttpHost; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.IOException; - -import static org.apache.http.auth.AuthScope.ANY; - -/** - * Contains factory methods for Elasticsearch sinks. - */ -public final class ElasticsearchSinks { - - private ElasticsearchSinks() { - } - - /** - * Creates a sink which indexes objects using the specified Elasticsearch - * client and the specified bulk request. - * - * @param name name of the created sink - * @param clientSupplier Elasticsearch REST client supplier - * @param bulkRequestSupplier bulk request supplier, will be called to obtain a - * new {@link BulkRequest} instance after each call. - * @param requestFn creates an {@link IndexRequest}, {@link UpdateRequest} - * or {@link DeleteRequest} for each object - * @param optionsFn obtains {@link RequestOptions} for each request - * @param destroyFn called upon completion to release any resource - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx bulkRequestSupplier, - @Nonnull FunctionEx requestFn, - @Nonnull FunctionEx optionsFn, - @Nonnull ConsumerEx destroyFn - ) { - return SinkBuilder - .sinkBuilder(name, ctx -> new BulkContext(clientSupplier.get(), bulkRequestSupplier, optionsFn, destroyFn)) - .receiveFn((bulkContext, item) -> bulkContext.add(requestFn.apply(item))) - .flushFn(BulkContext::flush) - .destroyFn(BulkContext::close) - .preferredLocalParallelism(2) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, - * FunctionEx, FunctionEx, ConsumerEx)}. Creates a new {@link BulkRequest} - * with default options for each batch and closes the {@link - * RestHighLevelClient} upon completion. Assumes that the {@code - * clientSupplier} creates a new client for each call. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, clientSupplier, BulkRequest::new, requestFn, - request -> RequestOptions.DEFAULT, RestHighLevelClient::close); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, FunctionEx)} - * Rest client is configured with basic authentication. - */ - public static Sink elasticsearch( - @Nonnull String name, - @Nonnull String username, - @Nullable String password, - @Nonnull String hostname, - int port, - @Nonnull FunctionEx requestFn - ) { - return elasticsearch(name, () -> buildClient(username, password, hostname, port), requestFn); - } - - static RestHighLevelClient buildClient(String username, String password, String hostname, int port) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(username, password)); - return new RestHighLevelClient( - RestClient.builder(new HttpHost(hostname, port)) - .setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)) - ); - } - - private static final class BulkContext { - - private final RestHighLevelClient client; - private final SupplierEx bulkRequestSupplier; - private final FunctionEx optionsFn; - private final ConsumerEx destroyFn; - - private BulkRequest bulkRequest; - - private BulkContext(RestHighLevelClient client, SupplierEx bulkRequestSupplier, - FunctionEx optionsFn, - ConsumerEx destroyFn) { - this.client = client; - this.bulkRequestSupplier = bulkRequestSupplier; - this.optionsFn = optionsFn; - this.destroyFn = destroyFn; - - this.bulkRequest = bulkRequestSupplier.get(); - } - - private void add(DocWriteRequest request) { - bulkRequest.add(request); - } - - private void flush() throws IOException { - BulkResponse response = client.bulk(bulkRequest, optionsFn.apply(bulkRequest)); - if (response.hasFailures()) { - throw new ElasticsearchException(response.buildFailureMessage()); - } - bulkRequest = bulkRequestSupplier.get(); - } - - private void close() { - destroyFn.accept(client); - } - } - -} diff --git a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java b/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java deleted file mode 100644 index d4d4fba..0000000 --- a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.function.ConsumerEx; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.pipeline.BatchSource; -import com.hazelcast.jet.pipeline.SourceBuilder; -import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.SearchHit; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.IOException; - -/** - * Contains factory methods for Elasticsearch sources. - */ -public final class ElasticsearchSources { - - private static final String DEFAULT_SCROLL_TIMEOUT = "60s"; - - private ElasticsearchSources() { - } - - /** - * Creates a source which queries objects using the specified Elasticsearch - * client and the specified request supplier using scrolling method. - * - * @param name name of the source - * @param clientSupplier Elasticsearch REST client supplier - * @param searchRequestSupplier search request supplier - * @param scrollTimeout scroll keep alive time - * @param mapHitFn maps search hits to output items - * @param optionsFn obtains {@link RequestOptions} for each request - * @param destroyFn called upon completion to release any resource - * @param type of items emitted downstream - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier, - @Nonnull String scrollTimeout, - @Nonnull FunctionEx mapHitFn, - @Nonnull FunctionEx optionsFn, - @Nonnull ConsumerEx destroyFn - ) { - return SourceBuilder - .batch(name, ctx -> new SearchContext<>(clientSupplier.get(), scrollTimeout, - mapHitFn, searchRequestSupplier.get(), optionsFn, destroyFn)) - .fillBufferFn(SearchContext::fillBuffer) - .destroyFn(SearchContext::close) - .build(); - } - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, - * String, FunctionEx, FunctionEx, ConsumerEx)}. - * Uses {@link #DEFAULT_SCROLL_TIMEOUT} for scroll timeout and {@link - * RequestOptions#DEFAULT}, emits string representation of items using - * {@link SearchHit#getSourceAsString()} and closes the {@link - * RestHighLevelClient} upon completion. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull SupplierEx clientSupplier, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, clientSupplier, searchRequestSupplier, DEFAULT_SCROLL_TIMEOUT, - SearchHit::getSourceAsString, request -> RequestOptions.DEFAULT, RestHighLevelClient::close); - } - - - /** - * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx)}. - * Rest client is configured with basic authentication. - */ - public static BatchSource elasticsearch( - @Nonnull String name, - @Nonnull String username, - @Nullable String password, - @Nonnull String hostname, - int port, - @Nonnull SupplierEx searchRequestSupplier - ) { - return elasticsearch(name, () -> ElasticsearchSinks.buildClient(username, password, hostname, port), - searchRequestSupplier); - } - - private static final class SearchContext { - - private final RestHighLevelClient client; - private final String scrollInterval; - private final FunctionEx mapHitFn; - private final FunctionEx optionsFn; - private final ConsumerEx destroyFn; - - private SearchResponse searchResponse; - - private SearchContext(RestHighLevelClient client, String scrollInterval, - FunctionEx mapHitFn, SearchRequest searchRequest, - FunctionEx optionsFn, - ConsumerEx destroyFn - ) throws IOException { - this.client = client; - this.scrollInterval = scrollInterval; - this.mapHitFn = mapHitFn; - this.optionsFn = optionsFn; - this.destroyFn = destroyFn; - - searchRequest.scroll(scrollInterval); - searchResponse = client.search(searchRequest, optionsFn.apply(searchRequest)); - } - - private void fillBuffer(SourceBuffer buffer) throws IOException { - SearchHit[] hits = searchResponse.getHits().getHits(); - if (hits == null || hits.length == 0) { - buffer.close(); - return; - } - for (SearchHit hit : hits) { - T item = mapHitFn.apply(hit); - if (item != null) { - buffer.add(item); - } - } - - SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()); - scrollRequest.scroll(scrollInterval); - searchResponse = client.scroll(scrollRequest, optionsFn.apply(scrollRequest)); - } - - private void clearScroll() throws IOException { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(searchResponse.getScrollId()); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - } - - private void close() throws IOException { - clearScroll(); - destroyFn.accept(client); - } - } -} diff --git a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java b/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java deleted file mode 100644 index 72b4b18..0000000 --- a/elasticsearch/elasticsearch-7/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains sources and sinks for Elasticsearch - */ -package com.hazelcast.jet.contrib.elasticsearch; diff --git a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java b/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java deleted file mode 100644 index c17d730..0000000 --- a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.core.JetTestSupport; -import org.apache.http.HttpHost; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.Network; -import org.testcontainers.elasticsearch.ElasticsearchContainer; - -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public abstract class ElasticsearchBaseTest extends JetTestSupport { - - private static final int OBJECT_COUNT = 20; - - @Rule - public ElasticsearchContainer container = - new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.0") - .withNetwork(Network.newNetwork()); - JetInstance jet; - IList userList; - String indexName = "users"; - private RestHighLevelClient client; - - @Before - public void setupBase() { - container.start(); - client = createClient(container.getHttpHostAddress()); - - jet = createJetMember(); - - userList = jet.getList("userList"); - for (int i = 0; i < OBJECT_COUNT; i++) { - userList.add(new User("user-" + i, i)); - } - } - - @After - public void cleanupBase() throws IOException { - container.stop(); - client.close(); - jet.shutdown(); - } - - void assertIndexes() throws IOException { - MultiGetRequest multiGetRequest = new MultiGetRequest(); - for (int i = 0; i < OBJECT_COUNT; i++) { - multiGetRequest.add(new MultiGetRequest.Item(indexName, String.valueOf(i))); - } - MultiGetResponse multiGetResponse = client.mget(multiGetRequest, RequestOptions.DEFAULT); - MultiGetItemResponse[] responses = multiGetResponse.getResponses(); - assertEquals(OBJECT_COUNT, responses.length); - for (int i = 0; i < OBJECT_COUNT; i++) { - MultiGetItemResponse itemResponse = responses[i]; - assertNull(itemResponse.getFailure()); - assertTrue(itemResponse.getResponse().isExists()); - } - } - - static RestHighLevelClient createClient(String containerAddress) { - return new RestHighLevelClient(RestClient.builder(HttpHost.create(containerAddress))); - } - - static FunctionEx indexFn(String indexName) { - return user -> { - IndexRequest request = new IndexRequest(indexName); - request.id(String.valueOf(user.age)); - Map jsonMap = new HashMap<>(); - jsonMap.put("name", user.name); - jsonMap.put("age", user.age); - request.source(jsonMap); - return request; - }; - } - - static final class User implements Serializable { - - final String name; - final int age; - - User(String name, int age) { - this.name = name; - this.age = age; - } - - } -} diff --git a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java b/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java deleted file mode 100644 index a57e427..0000000 --- a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sources; -import org.junit.Test; - -import java.io.IOException; - -import static com.hazelcast.jet.contrib.elasticsearch.ElasticsearchSinks.elasticsearch; - -public class ElasticsearchSinkTest extends ElasticsearchBaseTest { - - @Test - public void test_elasticsearchSink() throws IOException { - String containerAddress = container.getHttpHostAddress(); - - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName))); - - jet.newJob(p).join(); - - assertIndexes(); - } -} diff --git a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java b/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java deleted file mode 100644 index 702f1c4..0000000 --- a/elasticsearch/elasticsearch-7/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hazelcast.jet.contrib.elasticsearch; - -import com.hazelcast.collection.IList; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.Sources; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.Test; - -import java.io.IOException; - -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.junit.Assert.assertEquals; - -public class ElasticsearchSourceTest extends ElasticsearchBaseTest { - - @Test - public void test() throws IOException { - String containerAddress = container.getHttpHostAddress(); - - Pipeline p = Pipeline.create(); - p.readFrom(Sources.list(userList)) - .writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), - () -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), - request -> RequestOptions.DEFAULT, RestHighLevelClient::close)); - - jet.newJob(p).join(); - - assertIndexes(); - - p = Pipeline.create(); - - p.readFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress), - () -> { - SearchRequest searchRequest = new SearchRequest("users"); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(termQuery("age", 8)); - searchRequest.source(searchSourceBuilder); - return searchRequest; - })) - .writeTo(Sinks.list("sink")); - - jet.newJob(p).join(); - - IList sink = jet.getList("sink"); - assertEquals(1, sink.size()); - } - -} diff --git a/elasticsearch/elasticsearch-7/src/test/resources/log4j.properties b/elasticsearch/elasticsearch-7/src/test/resources/log4j.properties deleted file mode 100644 index b76bfeb..0000000 --- a/elasticsearch/elasticsearch-7/src/test/resources/log4j.properties +++ /dev/null @@ -1,10 +0,0 @@ -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p |%X{test-name}| - [%c{1}] %t - %m%n - -log4j.logger.com.hazelcast.jet=debug -log4j.logger.com.hazelcast.internal.cluster=info -log4j.logger.com.hazelcast=info - -log4j.rootLogger=info, stdout \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 8ac67a5..10f61e1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,9 +2,6 @@ rootProject.name = 'hazelcast-jet-contrib' include 'influxdb' include 'redis' include 'probabilistic' -include 'elasticsearch-5' -include 'elasticsearch-6' -include 'elasticsearch-7' include 'kafka-connect' include 'mongodb' include 'debezium' @@ -12,7 +9,3 @@ include 'twitter' include 'xa-test' include 'hazelcast-jet-spring-boot-starter' include 'pulsar' - -project(':elasticsearch-5').projectDir = file('elasticsearch/elasticsearch-5') -project(':elasticsearch-6').projectDir = file('elasticsearch/elasticsearch-6') -project(':elasticsearch-7').projectDir = file('elasticsearch/elasticsearch-7')