From e8cb96576771e624dd485ec6551079089b5e22a3 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Fri, 23 Aug 2024 17:21:14 +0200 Subject: [PATCH 1/3] signaling after successfully adding --- .../clients/elasticsearch/_helpers/bulk/BulkIngester.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index ad2920f4d..5e6b8addc 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -363,6 +363,9 @@ public void add(BulkOperation operation, Context context) { if (!canAddOperation()) { flush(); } + else { + addCondition.signalIfReady(); + } }); } From b98b26dd35920cd7c5d93a0827eda454fca80da0 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 26 Aug 2024 15:10:06 +0200 Subject: [PATCH 2/3] stress test unit test --- .../_helpers/bulk/BulkIngesterTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index a72473eda..21cab4a87 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -28,6 +28,7 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.core.bulk.OperationType; import co.elastic.clients.elasticsearch.end_to_end.RequestTest; +import co.elastic.clients.elasticsearch.indices.IndicesStatsResponse; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; @@ -156,6 +157,58 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, assertEquals(expectedRequests, transport.requestsStarted.get()); } + @Test + public void multiThreadStressTest() throws InterruptedException, IOException { + + Runtime runtime = Runtime.getRuntime(); + long usedMemoryBefore = runtime.totalMemory() - runtime.freeMemory(); + System.out.println("Used Memory before: " + usedMemoryBefore); + + + String index = "bulk-ingester-stress-test"; + ElasticsearchClient client = ElasticsearchTestServer.global().client(); + + // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme + // situation where the number of adding threads greatly exceeds the number of concurrent requests + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .globalSettings(s -> s.index(index)) + .flushInterval(5, TimeUnit.SECONDS) + ); + + RequestTest.AppData appData = new RequestTest.AppData(); + appData.setIntValue(42); + appData.setMsg("Some message"); + + ExecutorService executor = Executors.newFixedThreadPool(50); + + for (int i = 0; i < 100000; i++) { + int ii = i; + Runnable thread = () -> { + int finalI = ii; + ingester.add(_1 -> _1 + .create(_2 -> _2 + .id(String.valueOf(finalI)) + .document(appData) + )); + }; + executor.submit(thread); + } + + executor.awaitTermination(10,TimeUnit.SECONDS); + ingester.close(); + + client.indices().refresh(); + + IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); + + assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); + + long usedMemoryAfter = runtime.totalMemory() - runtime.freeMemory(); + System.out.println("Memory increased:" + (usedMemoryAfter-usedMemoryBefore)); + } + @Test public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); From 93beee69f5c5870e5057b1a90b00412e9db6080c Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 26 Aug 2024 15:15:58 +0200 Subject: [PATCH 3/3] removed memory calc --- .../elasticsearch/_helpers/bulk/BulkIngesterTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 21cab4a87..d6aee8cc5 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -160,11 +160,6 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, @Test public void multiThreadStressTest() throws InterruptedException, IOException { - Runtime runtime = Runtime.getRuntime(); - long usedMemoryBefore = runtime.totalMemory() - runtime.freeMemory(); - System.out.println("Used Memory before: " + usedMemoryBefore); - - String index = "bulk-ingester-stress-test"; ElasticsearchClient client = ElasticsearchTestServer.global().client(); @@ -204,9 +199,6 @@ public void multiThreadStressTest() throws InterruptedException, IOException { IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); - - long usedMemoryAfter = runtime.totalMemory() - runtime.freeMemory(); - System.out.println("Memory increased:" + (usedMemoryAfter-usedMemoryBefore)); } @Test