From 03024d489a7271acfa5e44de8c63fb27229be116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 16 Apr 2021 09:19:36 +0200 Subject: [PATCH] working on dynamic bulk requests --- .../java/org/xbib/elx/api/BulkListener.java | 3 +- .../java/org/xbib/elx/api/BulkMetric.java | 3 - .../java/org/xbib/elx/api/BulkProcessor.java | 7 +- .../org/xbib/elx/api/BulkRequestHandler.java | 11 +- .../elx/common/DefaultBulkController.java | 15 +- .../xbib/elx/common/DefaultBulkListener.java | 73 ++++--- .../xbib/elx/common/DefaultBulkMetric.java | 7 - .../xbib/elx/common/DefaultBulkProcessor.java | 189 +++++++++--------- .../xbib/elx/node/test/BulkClientTest.java | 18 +- .../elx/transport/test/BulkClientTest.java | 22 +- gradle/test/junit5.gradle | 2 + 11 files changed, 178 insertions(+), 172 deletions(-) diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java index f434ed4..362dcb5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java @@ -2,11 +2,12 @@ package org.xbib.elx.api; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import java.io.Closeable; /** * A bulk listener for following executions of bulk operations. */ -public interface BulkListener { +public interface BulkListener extends Closeable { /** * Callback before the bulk is executed. diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index b7d11e0..37597a8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,6 +1,5 @@ package org.xbib.elx.api; -import org.elasticsearch.common.settings.Settings; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -8,8 +7,6 @@ import java.io.Closeable; public interface BulkMetric extends Closeable { - void init(Settings settings); - void markTotalIngest(long n); Metered getTotalIngest(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 992d90a..de49ea0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -4,6 +4,7 @@ import org.elasticsearch.action.ActionRequest; import java.io.Closeable; import java.io.Flushable; +import java.io.IOException; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { @@ -16,9 +17,9 @@ public interface BulkProcessor extends Closeable, Flushable { long getBulkSize(); - BulkProcessor add(ActionRequest request); + BulkRequestHandler getBulkRequestHandler(); - boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; + void add(ActionRequest request); - boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; + boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException; } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java index 1bc3886..8483722 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java @@ -1,11 +1,18 @@ package org.xbib.elx.api; import org.elasticsearch.action.bulk.BulkRequest; +import java.io.IOException; import java.util.concurrent.TimeUnit; public interface BulkRequestHandler { - void execute(BulkRequest bulkRequest, long executionId); + void execute(BulkRequest bulkRequest); - boolean close(long timeout, TimeUnit unit) throws InterruptedException; + boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException; + + int getPermits(); + + void increase(); + + void reduce(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index a4d1d8d..1e1b34b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; @@ -25,11 +24,9 @@ public class DefaultBulkController implements BulkController { private final BulkClient bulkClient; - private final BulkMetric bulkMetric; - private BulkProcessor bulkProcessor; - private BulkListener bulkListener; + private DefaultBulkListener bulkListener; private long maxWaitTime; @@ -39,13 +36,11 @@ public class DefaultBulkController implements BulkController { public DefaultBulkController(BulkClient bulkClient) { this.bulkClient = bulkClient; - this.bulkMetric = new DefaultBulkMetric(); this.active = new AtomicBoolean(false); } @Override public void init(Settings settings) { - bulkMetric.init(settings); String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), Parameters.MAX_WAIT_BULK_RESPONSE.getString()); TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr, @@ -68,7 +63,7 @@ public class DefaultBulkController implements BulkController { Parameters.FAIL_ON_BULK_ERROR.getBoolean()); int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), Parameters.RESPONSE_TIME_COUNT.getInteger()); - this.bulkListener = new DefaultBulkListener(this, bulkMetric, + this.bulkListener = new DefaultBulkListener(this, enableBulkLogging, failOnBulkError, responseTimeCount); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) @@ -96,7 +91,7 @@ public class DefaultBulkController implements BulkController { @Override public BulkMetric getBulkMetric() { - return bulkMetric; + return bulkListener != null ? bulkListener.getBulkMetric() : null; } @Override @@ -123,7 +118,6 @@ public class DefaultBulkController implements BulkController { public void bulkIndex(IndexRequest indexRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); bulkProcessor.add(indexRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { @@ -137,7 +131,6 @@ public class DefaultBulkController implements BulkController { public void bulkDelete(DeleteRequest deleteRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); bulkProcessor.add(deleteRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { @@ -151,7 +144,6 @@ public class DefaultBulkController implements BulkController { public void bulkUpdate(UpdateRequest updateRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); bulkProcessor.add(updateRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { @@ -201,7 +193,6 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { - bulkMetric.close(); flush(); bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); if (bulkProcessor != null) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 686a031..dd892f7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -9,8 +9,10 @@ import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; +import java.io.IOException; import java.util.Arrays; import java.util.LongSummaryStatistics; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.LongStream; public class DefaultBulkListener implements BulkListener { @@ -27,21 +29,25 @@ public class DefaultBulkListener implements BulkListener { private Throwable lastBulkError; - private final int responseTimeCount; + private final int ringBufferSize; - private final LastResponseTimes responseTimes; + private final LongRingBuffer ringBuffer; public DefaultBulkListener(BulkController bulkController, - BulkMetric bulkMetric, boolean isBulkLoggingEnabled, boolean failOnError, - int responseTimeCount) { + int ringBufferSize) { this.bulkController = bulkController; - this.bulkMetric = bulkMetric; this.isBulkLoggingEnabled = isBulkLoggingEnabled; this.failOnError = failOnError; - this.responseTimeCount = responseTimeCount; - this.responseTimes = new LastResponseTimes(responseTimeCount); + this.ringBufferSize = ringBufferSize; + this.ringBuffer = new LongRingBuffer(ringBufferSize); + this.bulkMetric = new DefaultBulkMetric(); + bulkMetric.start(); + } + + public BulkMetric getBulkMetric() { + return bulkMetric; } @Override @@ -66,14 +72,17 @@ public class DefaultBulkListener implements BulkListener { long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); - if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) { - LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics(); + if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) { + LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics(); + LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics(); if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("bulk response millis: avg = " + stat.getAverage() + - " min =" + stat.getMin() + - " max = " + stat.getMax() + - " actions = " + bulkController.getBulkProcessor().getBulkActions() + - " size = " + bulkController.getBulkProcessor().getBulkSize()); + logger.debug("bulk response millis: avg = " + stat1.getAverage() + + " min = " + stat1.getMin() + + " max = " + stat1.getMax() + + " size: avg = " + stat2.getAverage() + + " min = " + stat2.getMin() + + " max = " + stat2.getMax() + + " throughput: " + (stat2.getAverage() / stat1.getAverage()) + " bytes/ms"); } } int n = 0; @@ -122,29 +131,41 @@ public class DefaultBulkListener implements BulkListener { return lastBulkError; } - private static class LastResponseTimes { + @Override + public void close() throws IOException { + bulkMetric.close(); + } - private final Long[] values; + private static class LongRingBuffer { + + private final Long[] values1, values2; private final int limit; - private int index; + private final AtomicInteger index; - public LastResponseTimes(int limit) { - this.values = new Long[limit]; - Arrays.fill(values, -1L); + public LongRingBuffer(int limit) { + this.values1 = new Long[limit]; + this.values2 = new Long[limit]; + Arrays.fill(values1, -1L); + Arrays.fill(values2, -1L); this.limit = limit; - this.index = 0; + this.index = new AtomicInteger(); } - public int add(Long value) { - int i = index++ % limit; - values[i] = value; + public int add(Long v1, Long v2) { + int i = index.incrementAndGet() % limit; + values1[i] = v1; + values2[i] = v2; return i; } - public LongStream longStream() { - return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue); + public LongStream longStreamValues1() { + return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue); + } + + public LongStream longStreamValues2() { + return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue); } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index fc3c14b..7b2987f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,6 +1,5 @@ package org.xbib.elx.common; -import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -39,12 +38,6 @@ public class DefaultBulkMetric implements BulkMetric { failed = new CountMetric(); } - @Override - public void init(Settings settings) { - start(); - } - - @Override public void markTotalIngest(long n) { totalIngest.mark(n); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index d326023..c6eea96 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -10,11 +10,11 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.BulkRequestHandler; +import java.io.IOException; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -37,8 +37,6 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ScheduledFuture scheduledFuture; - private final AtomicLong executionIdGen; - private final BulkRequestHandler bulkRequestHandler; private BulkRequest bulkRequest; @@ -56,7 +54,6 @@ public class DefaultBulkProcessor implements BulkProcessor { int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { - this.executionIdGen = new AtomicLong(); this.closed = false; this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); @@ -102,79 +99,20 @@ public class DefaultBulkProcessor implements BulkProcessor { return bulkSize; } - /** - * Wait for bulk request handler with flush. - * @param timeout the timeout value - * @param unit the timeout unit - * @return true is method was successful, false if timeout - * @throws InterruptedException if timeout - */ - @Override - public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { - Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null"); - if (closed) { - return true; - } - // flush - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - // wait for all bulk responses - return bulkRequestHandler.close(timeout, unit); + public BulkRequestHandler getBulkRequestHandler() { + return bulkRequestHandler; } - /** - * Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called - * once as the last action of a bulk processor. - * - * If concurrent requests are not enabled, returns {@code true} immediately. - * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then - * returns {@code true}, - * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned. - * - * @param timeout The maximum time to wait for the bulk requests to complete - * @param unit The time unit of the {@code timeout} argument - * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the - * bulk requests completed - * @throws InterruptedException If the current thread is interrupted - */ @Override - public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { - Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null"); - if (closed) { - return true; - } - closed = true; - if (scheduledFuture != null) { - FutureUtils.cancel(scheduledFuture); - scheduler.shutdown(); - } - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - return bulkRequestHandler.close(timeout, unit); - } - - /** - * Adds either a delete or an index request. - * - * @param request request - * @return his bulk processor - */ - @Override - public synchronized DefaultBulkProcessor add(ActionRequest request) { + public synchronized void add(ActionRequest request) { ensureOpen(); bulkRequest.add(request); if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) || (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) { execute(); } - return this; } - /** - * Flush pending delete or index requests. - */ @Override public synchronized void flush() { ensureOpen(); @@ -183,14 +121,32 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - */ @Override - public void close() { + public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException { + if (closed) { + return true; + } + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + return bulkRequestHandler.flush(timeout, unit); + } + + @Override + public synchronized void close() throws IOException { try { - // 0 = immediate close - awaitClose(0, TimeUnit.NANOSECONDS); + if (closed) { + return; + } + closed = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduler.shutdown(); + } + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + bulkRequestHandler.flush(0, TimeUnit.NANOSECONDS); } catch (InterruptedException exc) { Thread.currentThread().interrupt(); } @@ -204,9 +160,8 @@ public class DefaultBulkProcessor implements BulkProcessor { private void execute() { BulkRequest myBulkRequest = this.bulkRequest; - long executionId = executionIdGen.incrementAndGet(); this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler.execute(myBulkRequest, executionId); + this.bulkRequestHandler.execute(myBulkRequest); } /** @@ -278,7 +233,7 @@ public class DefaultBulkProcessor implements BulkProcessor { /** * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to - * {@code 5mb}. Can be set to {@code -1} to disable it. + * {@code 1mb}. Can be set to {@code -1} to disable it. * * @param bulkSize bulk size * @return this builder @@ -319,10 +274,9 @@ public class DefaultBulkProcessor implements BulkProcessor { if (closed) { return; } - if (bulkRequest.numberOfActions() == 0) { - return; + if (bulkRequest.numberOfActions() > 0) { + execute(); } - execute(); } } } @@ -333,14 +287,17 @@ public class DefaultBulkProcessor implements BulkProcessor { private final BulkListener bulkListener; + private final AtomicLong executionIdGen; + SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) { - Objects.requireNonNull(bulkListener, "A listener is required for SyncBulkRequestHandler but null"); this.client = client; this.bulkListener = bulkListener; + this.executionIdGen = new AtomicLong(); } @Override - public void execute(BulkRequest bulkRequest, long executionId) { + public void execute(BulkRequest bulkRequest) { + long executionId = executionIdGen.incrementAndGet(); boolean afterCalled = false; try { bulkListener.beforeBulk(executionId, bulkRequest); @@ -355,9 +312,22 @@ public class DefaultBulkProcessor implements BulkProcessor { } @Override - public boolean close(long timeout, TimeUnit unit) { + public boolean flush(long timeout, TimeUnit unit) { return true; } + + @Override + public int getPermits() { + return 1; + } + + @Override + public void increase() { + } + + @Override + public void reduce() { + } } private static class AsyncBulkRequestHandler implements BulkRequestHandler { @@ -366,22 +336,27 @@ public class DefaultBulkProcessor implements BulkProcessor { private final BulkListener bulkListener; - private final Semaphore semaphore; + private final ResizeableSemaphore semaphore; - private final int concurrentRequests; + private final AtomicLong executionIdGen; - private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) { - Objects.requireNonNull(bulkListener, "A listener is required for AsyncBulkRequestHandler but null"); + private int permits; + + private AsyncBulkRequestHandler(ElasticsearchClient client, + BulkListener bulkListener, + int permits) { this.client = client; this.bulkListener = bulkListener; - this.concurrentRequests = concurrentRequests; - this.semaphore = new Semaphore(concurrentRequests); + this.permits = permits; + this.semaphore = new ResizeableSemaphore(permits); + this.executionIdGen = new AtomicLong(); } @Override - public void execute(final BulkRequest bulkRequest, final long executionId) { + public void execute(BulkRequest bulkRequest) { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; + long executionId = executionIdGen.incrementAndGet(); try { bulkListener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); @@ -413,19 +388,49 @@ public class DefaultBulkProcessor implements BulkProcessor { bulkListener.afterBulk(executionId, bulkRequest, e); } finally { if (!bulkRequestSetupSuccessful && acquired) { - // if we fail on client.bulk() release the semaphore semaphore.release(); } } } @Override - public boolean close(long timeout, TimeUnit unit) throws InterruptedException { - if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) { - semaphore.release(concurrentRequests); + public boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException { + bulkListener.close(); + if (semaphore.tryAcquire(permits, timeout, unit)) { + semaphore.release(permits); return true; } return false; } + + @Override + public int getPermits() { + return permits; + } + + @Override + public void increase() { + semaphore.release(1); + this.permits++; + } + + @Override + public void reduce() { + semaphore.reducePermits(1); + this.permits--; + } + } + + @SuppressWarnings("serial") + private static class ResizeableSemaphore extends Semaphore { + + ResizeableSemaphore(int permits) { + super(permits, true); + } + + @Override + protected void reducePermits(int reduction) { + super.reducePermits(reduction); + } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 5cd6e40..4038590 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -130,13 +130,12 @@ class BulkClientTest { @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; - logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions); + long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { @@ -155,17 +154,12 @@ class BulkClientTest { latch.countDown(); }); } - logger.info("waiting for latch..."); - if (latch.await(30L, TimeUnit.SECONDS)) { - logger.info("flush..."); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - logger.info("got all responses, executor service shutdown..."); + if (latch.await(timeout, TimeUnit.SECONDS)) { + bulkClient.waitForResponses(timeout, TimeUnit.SECONDS); executorService.shutdown(); - executorService.awaitTermination(30L, TimeUnit.SECONDS); - logger.info("pool is shut down"); + executorService.awaitTermination(timeout, TimeUnit.SECONDS); } else { - logger.warn("latch timeout"); + logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 4f9dc55..641c40d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -29,9 +29,9 @@ class BulkClientTest { private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName()); - private static final Long ACTIONS = 10000L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10000L; + private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; private final TestExtension.Helper helper; @@ -128,13 +128,12 @@ class BulkClientTest { @Test void testThreadedRandomDocs() { int maxthreads = Runtime.getRuntime().availableProcessors(); - Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; - logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions); + long timeout = 120L; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads * 2) .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .put(Parameters.ENABLE_BULK_LOGGING.getName(), Boolean.TRUE) @@ -157,17 +156,12 @@ class BulkClientTest { latch.countDown(); }); } - logger.info("waiting for latch..."); - if (latch.await(30L, TimeUnit.SECONDS)) { - logger.info("flush..."); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - logger.info("got all responses, executor service shutdown..."); + if (latch.await(timeout, TimeUnit.SECONDS)) { + bulkClient.waitForResponses(timeout, TimeUnit.SECONDS); executorService.shutdown(); - executorService.awaitTermination(30L, TimeUnit.SECONDS); - logger.info("pool is shut down"); + executorService.awaitTermination(timeout, TimeUnit.SECONDS); } else { - logger.warn("latch timeout"); + logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); diff --git a/gradle/test/junit5.gradle b/gradle/test/junit5.gradle index b02dea8..838c541 100644 --- a/gradle/test/junit5.gradle +++ b/gradle/test/junit5.gradle @@ -13,6 +13,8 @@ test { useJUnitPlatform() failFast = true jvmArgs = [ + '-Xmx2g', + '-Xms2g', '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED', '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED'