From 8b6e64c93a9bd8cf3aeec2f5cb116c5664acb6e1 Mon Sep 17 00:00:00 2001 From: Laura Trotta <153528055+l-trotta@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:39:33 +0200 Subject: [PATCH] Addressing bulk ingester stuck threads (#870) * signaling after successfully adding * stress test unit test * removed memory calc --- .../_helpers/bulk/BulkIngester.java | 3 ++ .../_helpers/bulk/BulkIngesterTest.java | 45 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index b47b5f327..09b5bbb3b 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -358,6 +358,9 @@ public void add(BulkOperation operation, Context context) { if (!canAddOperation()) { flush(); } + else { + addCondition.signalIfReady(); + } }); } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 95effd4c1..b765a1a3b 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -28,6 +28,7 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.core.bulk.OperationType; import co.elastic.clients.elasticsearch.end_to_end.RequestTest; +import co.elastic.clients.elasticsearch.indices.IndicesStatsResponse; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.SimpleJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; @@ -146,6 +147,50 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, assertEquals(expectedRequests, transport.requestsStarted.get()); } + @Test + public void multiThreadStressTest() throws InterruptedException, IOException { + + String index = "bulk-ingester-stress-test"; + ElasticsearchClient client = ElasticsearchTestServer.global().client(); + + // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme + // situation where the number of adding threads greatly exceeds the number of concurrent requests + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .globalSettings(s -> s.index(index)) + .flushInterval(5, TimeUnit.SECONDS) + ); + + RequestTest.AppData appData = new RequestTest.AppData(); + appData.setIntValue(42); + appData.setMsg("Some message"); + + ExecutorService executor = Executors.newFixedThreadPool(50); + + for (int i = 0; i < 100000; i++) { + int ii = i; + Runnable thread = () -> { + int finalI = ii; + ingester.add(_1 -> _1 + .create(_2 -> _2 + .id(String.valueOf(finalI)) + .document(appData) + )); + }; + executor.submit(thread); + } + + executor.awaitTermination(10,TimeUnit.SECONDS); + ingester.close(); + + client.indices().refresh(); + + IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); + + assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); + } + @Test public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport();