diff --git a/README.md b/README.md
index 34a4a2f..405f85a 100644
--- a/README.md
+++ b/README.md
@@ -32,11 +32,6 @@ pipelines to read/write data points from/to InfluxDb.
A collection of probabilistic aggregations such as HyperLogLog.
-### [Elasticsearch Connector](elasticsearch)
-
-A Hazelcast Jet connector for Elasticsearch for querying/indexing objects
-from/to Elasticsearch.
-
### [Redis Connectors](redis)
Hazelcast Jet connectors for various Redis data structures.
diff --git a/elasticsearch/README.md b/elasticsearch/README.md
index 2b995ae..64cf6d9 100644
--- a/elasticsearch/README.md
+++ b/elasticsearch/README.md
@@ -1,12 +1,5 @@
# Elasticsearch Connectors
-A Hazelcast Jet connector for Elasticsearch (v5..v7) for querying/indexing
-objects from/to Elasticsearch.
-
-There are 3 major versions of Elasticsearch we support:
-
-- [Elasticsearch 5.6.x](elasticsearch-5)
-
-- [Elasticsearch 6.x.x](elasticsearch-6)
-
-- [Elasticsearch 7.x.x](elasticsearch-7)
+Starting with Hazelcast Jet version 4.2 the Elasticsearch connectors
+were moved to the [main repository](https://github.com/hazelcast/hazelcast-jet/tree/master/extensions/elasticsearch)
+and are part of the standard distribution.
\ No newline at end of file
diff --git a/elasticsearch/elasticsearch-5/README.md b/elasticsearch/elasticsearch-5/README.md
deleted file mode 100644
index 206c8cc..0000000
--- a/elasticsearch/elasticsearch-5/README.md
+++ /dev/null
@@ -1,90 +0,0 @@
-# Elasticsearch Connector
-
-A Hazelcast Jet connector for Elasticsearch (v5.6.x) for querying/indexing objects
-from/to Elasticsearch.
-
-## Getting Started
-
-### Installing
-
-The Elasticsearch Connector artifacts are published in the Maven repositories.
-
-Add the following lines to your pom.xml to include it as a dependency to your project:
-
-```
-
- com.hazelcast.jet.contrib
- elasticsearch-5
- ${version}
-
-```
-
-or if you are using Gradle:
-```
-compile group: 'com.hazelcast.jet.contrib', name: 'elasticsearch-5', version: ${version}
-```
-
-### Usage
-
-#### As a Source
-
-Elasticsearch batch source (`ElasticsearchSources.elasticsearch()`) executes
-the query and retrieves the results using `scrolling`.
-
-Following is an example pipeline which queries Elasticsearch and logs the
-results:
-
-```java
-Pipeline p = Pipeline.create();
-
-p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
- () -> RestClient.builder(HttpHost.create("hostAddress")).build(),
- () -> {
- SearchRequest searchRequest = new SearchRequest("users");
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(termQuery("age", 8));
- searchRequest.source(searchSourceBuilder);
- return searchRequest;
- },
- "10s",
- SearchHit::getSourceAsString,
- RestClient::close))
- .writeTo(Sinks.logger());
-```
-
-#### As a Sink
-
-Elasticsearch sink (`ElasticsearchSinks.elasticsearch()`) is used to index objects from
-Hazelcast Jet Pipeline to Elasticsearch.
-
-Here is a very simple pipeline which reads out some users from Hazelcast
-List and indexes them to Elasticsearch:
-
-```java
-Pipeline p = Pipeline.create();
-p.readFrom(Sources.list(users))
- .writeTo(ElasticsearchSinks.elasticsearch("sinkName",
- () -> RestClient.builder(HttpHost.create("hostAddress")).build(),
- BulkRequest::new,
- user -> {
- IndexRequest request = new IndexRequest(indexName, "doc", user.id);
- Map jsonMap = new HashMap<>();
- jsonMap.put("name", user.name);
- jsonMap.put("age", user.age);
- request.source(jsonMap);
- return request;
- },
- RestClient::close));
-```
-
-### Running the tests
-
-To run the tests run the command below:
-
-```
-./gradlew test
-```
-
-## Authors
-
-* **[Ali Gurbuz](https://github.com/gurbuzali)**
diff --git a/elasticsearch/elasticsearch-5/build.gradle b/elasticsearch/elasticsearch-5/build.gradle
deleted file mode 100644
index 2859198..0000000
--- a/elasticsearch/elasticsearch-5/build.gradle
+++ /dev/null
@@ -1,5 +0,0 @@
-dependencies {
- compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.0'
- testCompile "org.testcontainers:elasticsearch:1.11.2"
- testCompile "org.slf4j:slf4j-log4j12:1.7.26"
-}
\ No newline at end of file
diff --git a/elasticsearch/elasticsearch-5/gradle.properties b/elasticsearch/elasticsearch-5/gradle.properties
deleted file mode 100644
index 4a62185..0000000
--- a/elasticsearch/elasticsearch-5/gradle.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-version = 0.3-SNAPSHOT
-description = A Hazelcast Jet connector for Elasticsearch (v5.6.x) for querying/indexing objects from/to Elasticsearch.
diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java
deleted file mode 100644
index b1508bf..0000000
--- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinks.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hazelcast.jet.contrib.elasticsearch;
-
-import com.hazelcast.function.ConsumerEx;
-import com.hazelcast.function.FunctionEx;
-import com.hazelcast.function.SupplierEx;
-import com.hazelcast.jet.pipeline.Sink;
-import com.hazelcast.jet.pipeline.SinkBuilder;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static org.apache.http.auth.AuthScope.ANY;
-
-/**
- * Contains factory methods for Elasticsearch sinks.
- */
-public final class ElasticsearchSinks {
-
- private ElasticsearchSinks() {
- }
-
- /**
- * Creates a sink which indexes objects using the specified Elasticsearch
- * client and the specified bulk request.
- *
- * @param name name of the created sink
- * @param clientSupplier Elasticsearch REST client supplier
- * @param bulkRequestSupplier bulk request supplier, will be called to obtain a
- * new {@link BulkRequest} instance after each call.
- * @param requestFn creates an {@link IndexRequest}, {@link UpdateRequest}
- * or {@link DeleteRequest} for each object
- * @param destroyFn called upon completion to release any resource
- */
- public static Sink elasticsearch(
- @Nonnull String name,
- @Nonnull SupplierEx extends RestClient> clientSupplier,
- @Nonnull SupplierEx bulkRequestSupplier,
- @Nonnull FunctionEx super T, ? extends DocWriteRequest> requestFn,
- @Nonnull ConsumerEx super RestClient> destroyFn
- ) {
- return SinkBuilder
- .sinkBuilder(name, ctx -> new BulkContext(clientSupplier.get(), bulkRequestSupplier))
- .receiveFn((bulkContext, item) -> bulkContext.add(requestFn.apply(item)))
- .flushFn(BulkContext::flush)
- .destroyFn(b -> destroyFn.accept(b.client))
- .preferredLocalParallelism(2)
- .build();
- }
-
- /**
- * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx,
- * FunctionEx, ConsumerEx)}. Creates a new {@link BulkRequest} with default
- * options for each batch and closes the {@link RestClient} upon
- * completion. Assumes that the {@code clientSupplier} creates a new client
- * for each call.
- */
- public static Sink elasticsearch(
- @Nonnull String name,
- @Nonnull SupplierEx extends RestClient> clientSupplier,
- @Nonnull FunctionEx super T, ? extends DocWriteRequest> requestFn
- ) {
- return elasticsearch(name, clientSupplier, BulkRequest::new, requestFn, RestClient::close);
- }
-
- /**
- * Convenience for {@link #elasticsearch(String, SupplierEx, FunctionEx)}
- * Rest client is configured with basic authentication.
- */
- public static Sink elasticsearch(
- @Nonnull String name,
- @Nonnull String username,
- @Nullable String password,
- @Nonnull String hostname,
- int port,
- @Nonnull FunctionEx super T, ? extends DocWriteRequest> requestFn
- ) {
- return elasticsearch(name, () -> buildClient(username, password, hostname, port), requestFn);
- }
-
- static RestClient buildClient(String username, String password, String hostname, int port) {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(username, password));
-
- return RestClient.builder(new HttpHost(hostname, port))
- .setHttpClientConfigCallback(httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build();
- }
-
- private static final class BulkContext {
-
- private final RestClient client;
- private final RestHighLevelClient highLevelClient;
- private final SupplierEx bulkRequestSupplier;
-
- private BulkRequest bulkRequest;
-
- private BulkContext(RestClient client, SupplierEx bulkRequestSupplier) {
- this.client = client;
- this.highLevelClient = new RestHighLevelClient(client);
- this.bulkRequestSupplier = bulkRequestSupplier;
- this.bulkRequest = bulkRequestSupplier.get();
- }
-
- private void add(DocWriteRequest request) {
- bulkRequest.add(request);
- }
-
- private void flush() throws IOException {
- BulkResponse response = highLevelClient.bulk(bulkRequest);
- if (response.hasFailures()) {
- throw new ElasticsearchException(response.buildFailureMessage());
- }
- bulkRequest = bulkRequestSupplier.get();
- }
- }
-}
diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java
deleted file mode 100644
index 8b92f5b..0000000
--- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSources.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hazelcast.jet.contrib.elasticsearch;
-
-import com.hazelcast.function.ConsumerEx;
-import com.hazelcast.function.FunctionEx;
-import com.hazelcast.function.SupplierEx;
-import com.hazelcast.jet.pipeline.BatchSource;
-import com.hazelcast.jet.pipeline.SourceBuilder;
-import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
-import org.elasticsearch.action.search.ClearScrollRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.search.SearchHit;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-
-import static com.hazelcast.jet.contrib.elasticsearch.ElasticsearchSinks.buildClient;
-
-/**
- * Contains factory methods for Elasticsearch sources.
- */
-public final class ElasticsearchSources {
-
- private static final String DEFAULT_SCROLL_TIMEOUT = "60s";
-
- private ElasticsearchSources() {
- }
-
- /**
- * Creates a source which queries objects using the specified Elasticsearch
- * client and the specified request supplier using scrolling method.
- *
- * @param name Name of the source
- * @param clientSupplier Elasticsearch rest client supplier
- * @param searchRequestSupplier Search request supplier
- * @param scrollTimeout scroll keep alive time
- * @param mapHitFn maps search hits to output items
- * @param destroyFn called upon completion to release any resource
- * @param type of items emitted downstream
- */
- public static BatchSource elasticsearch(
- @Nonnull String name,
- @Nonnull SupplierEx extends RestClient> clientSupplier,
- @Nonnull SupplierEx searchRequestSupplier,
- @Nonnull String scrollTimeout,
- @Nonnull FunctionEx mapHitFn,
- @Nonnull ConsumerEx super RestClient> destroyFn
- ) {
- return SourceBuilder
- .batch(name, ctx -> {
- RestClient client = clientSupplier.get();
- SearchRequest searchRequest = searchRequestSupplier.get();
- return new SearchContext<>(client, scrollTimeout, mapHitFn, searchRequest, destroyFn);
- })
- .fillBufferFn(SearchContext::fillBuffer)
- .destroyFn(SearchContext::close)
- .build();
- }
-
- /**
- * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx, String, FunctionEx, ConsumerEx)}.
- * Uses {@link #DEFAULT_SCROLL_TIMEOUT} for scroll timeout, emits string
- * representation of items using {@link SearchHit#getSourceAsString()} and
- * closes the {@link RestClient} upon completion.
- */
- public static BatchSource elasticsearch(
- @Nonnull String name,
- @Nonnull SupplierEx extends RestClient> clientSupplier,
- @Nonnull SupplierEx searchRequestSupplier
- ) {
- return elasticsearch(name, clientSupplier, searchRequestSupplier,
- DEFAULT_SCROLL_TIMEOUT, SearchHit::getSourceAsString, RestClient::close);
- }
-
- /**
- * Convenience for {@link #elasticsearch(String, SupplierEx, SupplierEx)}.
- * Rest client is configured with basic authentication.
- */
- public static BatchSource elasticsearch(
- @Nonnull String name,
- @Nonnull String username, String password,
- @Nonnull String hostname, int port,
- @Nonnull SupplierEx searchRequestSupplier
- ) {
- return elasticsearch(name, () -> buildClient(username, password, hostname, port), searchRequestSupplier);
- }
-
- private static final class SearchContext {
-
- private final RestClient client;
- private final RestHighLevelClient highLevelClient;
- private final String scrollInterval;
- private final FunctionEx mapHitFn;
- private final ConsumerEx super RestClient> destroyFn;
-
- private SearchResponse searchResponse;
-
- private SearchContext(RestClient client, String scrollInterval, FunctionEx mapHitFn,
- SearchRequest searchRequest, ConsumerEx super RestClient> destroyFn) throws IOException {
- this.client = client;
- this.highLevelClient = new RestHighLevelClient(client);
- this.scrollInterval = scrollInterval;
- this.mapHitFn = mapHitFn;
- this.destroyFn = destroyFn;
-
- searchRequest.scroll(scrollInterval);
- searchResponse = highLevelClient.search(searchRequest);
- }
-
- private void fillBuffer(SourceBuffer buffer) throws IOException {
- SearchHit[] hits = searchResponse.getHits().getHits();
- if (hits == null || hits.length == 0) {
- buffer.close();
- return;
- }
- for (SearchHit hit : hits) {
- T item = mapHitFn.apply(hit);
- if (item != null) {
- buffer.add(item);
- }
- }
-
- SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId());
- scrollRequest.scroll(scrollInterval);
- searchResponse = highLevelClient.searchScroll(scrollRequest);
- }
-
- private void clearScroll() throws IOException {
- ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
- clearScrollRequest.addScrollId(searchResponse.getScrollId());
- highLevelClient.clearScroll(clearScrollRequest);
- }
-
- private void close() throws IOException {
- clearScroll();
- destroyFn.accept(client);
- }
- }
-}
diff --git a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java b/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java
deleted file mode 100644
index bd022a6..0000000
--- a/elasticsearch/elasticsearch-5/src/main/java/com/hazelcast/jet/contrib/elasticsearch/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains sources and sinks for Elasticsearch.
- */
-package com.hazelcast.jet.contrib.elasticsearch;
diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java
deleted file mode 100644
index 96aa706..0000000
--- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchBaseTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hazelcast.jet.contrib.elasticsearch;
-
-import com.hazelcast.collection.IList;
-import com.hazelcast.function.FunctionEx;
-import com.hazelcast.jet.JetInstance;
-import com.hazelcast.jet.core.JetTestSupport;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.testcontainers.containers.Network;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.http.auth.AuthScope.ANY;
-import static org.junit.Assert.assertTrue;
-
-public abstract class ElasticsearchBaseTest extends JetTestSupport {
-
- static final String DEFAULT_USER = "elastic";
- static final String DEFAULT_PASS = "changeme";
-
- private static final int OBJECT_COUNT = 20;
-
- @Rule
- public ElasticsearchContainer container =
- new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:5.6.16")
- .withNetwork(Network.newNetwork());
-
- JetInstance jet;
- IList userList;
- String indexName = "users";
-
- private RestClient client;
- private RestHighLevelClient highLevelClient;
-
- @Before
- public void setupBase() {
- container.start();
-
- client = createClient(container.getContainerIpAddress(), mappedPort());
- highLevelClient = new RestHighLevelClient(client);
-
- jet = createJetMember();
-
- userList = jet.getList("userList");
- for (int i = 0; i < OBJECT_COUNT; i++) {
- userList.add(new User("user-" + i, i));
- }
- }
-
- @After
- public void cleanupBase() throws IOException {
- container.stop();
- client.close();
- }
-
- int mappedPort() {
- String hostAddress = container.getHttpHostAddress();
- return Integer.parseInt(hostAddress.split(":")[1]);
- }
-
- void assertIndexes() throws IOException {
- for (int i = 0; i < OBJECT_COUNT; i++) {
- GetRequest request = new GetRequest(indexName).id(String.valueOf(i));
- assertTrue(highLevelClient.exists(request));
- }
- }
-
- static RestClient createClient(String containerAddress, int port) {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(ANY, new UsernamePasswordCredentials(DEFAULT_USER, DEFAULT_PASS));
-
- return RestClient.builder(new HttpHost(containerAddress, port))
- .setHttpClientConfigCallback(httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build();
- }
-
- static FunctionEx indexFn(String indexName) {
- return user -> {
- IndexRequest request = new IndexRequest(indexName, "doc", String.valueOf(user.age));
- Map jsonMap = new HashMap<>();
- jsonMap.put("name", user.name);
- jsonMap.put("age", user.age);
- request.source(jsonMap);
- return request;
- };
- }
-
- static final class User implements Serializable {
-
- final String name;
- final int age;
-
- User(String name, int age) {
- this.name = name;
- this.age = age;
- }
-
- }
-}
diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java
deleted file mode 100644
index d291658..0000000
--- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSinkTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hazelcast.jet.contrib.elasticsearch;
-
-import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sources;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class ElasticsearchSinkTest extends ElasticsearchBaseTest {
-
- @Test
- public void test_elasticsearchSink() throws IOException {
- Pipeline p = Pipeline.create();
- p.readFrom(Sources.list(userList))
- .writeTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS,
- container.getContainerIpAddress(), mappedPort(), indexFn(indexName)));
-
- jet.newJob(p).join();
-
- assertIndexes();
- }
-}
diff --git a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java b/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java
deleted file mode 100644
index 0020d0b..0000000
--- a/elasticsearch/elasticsearch-5/src/test/java/com/hazelcast/jet/contrib/elasticsearch/ElasticsearchSourceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hazelcast.jet.contrib.elasticsearch;
-
-import com.hazelcast.collection.IList;
-import com.hazelcast.function.SupplierEx;
-import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
-import com.hazelcast.jet.pipeline.Sources;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-import static org.junit.Assert.assertEquals;
-
-public class ElasticsearchSourceTest extends ElasticsearchBaseTest {
-
- @Test
- public void test() throws IOException {
- String containerIpAddress = container.getContainerIpAddress();
- int port = mappedPort();
-
- SupplierEx clientSupplier = () -> createClient(containerIpAddress, port);
-
- Pipeline p = Pipeline.create();
- p.readFrom(Sources.list(userList))
- .writeTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier,
- () -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestClient::close));
-
- jet.newJob(p).join();
-
- assertIndexes();
-
- p = Pipeline.create();
-
- p.readFrom(ElasticsearchSources.elasticsearch("users", clientSupplier,
- () -> {
- SearchRequest searchRequest = new SearchRequest("users");
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(termQuery("age", 8));
- searchRequest.source(searchSourceBuilder);
- return searchRequest;
- }))
- .writeTo(Sinks.list("sink"));
-
- jet.newJob(p).join();
-
- IList