From 6731ad986b393e362b7682f6106ec78370ab762f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 16 Apr 2021 17:22:53 +0200 Subject: [PATCH] drop bulk controller, merge bulk handlers, experimenting with dynamic bulk volume adaptation --- .../java/org/xbib/elx/api/BulkClient.java | 8 +- .../java/org/xbib/elx/api/BulkController.java | 37 -- .../java/org/xbib/elx/api/BulkMetric.java | 4 + .../java/org/xbib/elx/api/BulkProcessor.java | 24 +- .../org/xbib/elx/api/BulkRequestHandler.java | 18 - .../xbib/elx/common/AbstractBasicClient.java | 1 - .../xbib/elx/common/AbstractBulkClient.java | 38 +- .../elx/common/DefaultBulkController.java | 211 -------- .../xbib/elx/common/DefaultBulkListener.java | 74 +-- .../xbib/elx/common/DefaultBulkMetric.java | 97 +++- .../xbib/elx/common/DefaultBulkProcessor.java | 463 +++++++----------- .../elx/common/DefaultIndexDefinition.java | 12 +- .../org/xbib/elx/common/LongRingBuffer.java | 38 ++ .../java/org/xbib/elx/common/Parameters.java | 13 +- .../org/xbib/elx/node/NodeClientHelper.java | 14 +- .../java/org/xbib/elx/node/package-info.java | 4 - .../xbib/elx/node/test/BulkClientTest.java | 11 - .../elx/transport/test/BulkClientTest.java | 15 +- gradle.properties | 2 +- 19 files changed, 376 insertions(+), 708 deletions(-) delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkController.java delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java delete mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java delete mode 100644 elx-node/src/main/java/org/xbib/elx/node/package-info.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 1074ae6..52c3dce 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -10,12 +10,6 @@ import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { - /** - * Get bulk control. - * @return the bulk control - */ - BulkController getBulkController(); - /** * Create a new index. * @param indexDefinition the index definition @@ -154,4 +148,6 @@ public interface BulkClient extends BasicClient, Flushable { * @param indexDefinition index definition */ void flushIndex(IndexDefinition indexDefinition); + + BulkProcessor getBulkController(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java deleted file mode 100644 index 7d76416..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.xbib.elx.api; - -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.settings.Settings; - -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public interface BulkController extends Closeable, Flushable { - - void init(Settings settings); - - void inactivate(); - - BulkProcessor getBulkProcessor(); - - BulkMetric getBulkMetric(); - - Throwable getLastBulkError(); - - void startBulkMode(IndexDefinition indexDefinition) throws IOException; - - void bulkIndex(IndexRequest indexRequest); - - void bulkDelete(DeleteRequest deleteRequest); - - void bulkUpdate(UpdateRequest updateRequest); - - boolean waitForBulkResponses(long timeout, TimeUnit timeUnit); - - void stopBulkMode(IndexDefinition indexDefinition) throws IOException; - -} diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index 37597a8..cb27111 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,5 +1,7 @@ package org.xbib.elx.api; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -28,4 +30,6 @@ public interface BulkMetric extends Closeable { void start(); void stop(); + + void recalculate(BulkRequest request, BulkResponse response); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index de49ea0..f4508b0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -9,17 +9,25 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { - void setBulkActions(int bulkActions); + void setEnabled(boolean enabled); - int getBulkActions(); + void startBulkMode(IndexDefinition indexDefinition) throws IOException; - void setBulkSize(long bulkSize); - - long getBulkSize(); - - BulkRequestHandler getBulkRequestHandler(); + void stopBulkMode(IndexDefinition indexDefinition) throws IOException; void add(ActionRequest request); - boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException; + boolean waitForBulkResponses(long timeout, TimeUnit unit); + + BulkMetric getBulkMetric(); + + Throwable getLastBulkError(); + + void setMaxBulkActions(int bulkActions); + + int getMaxBulkActions(); + + void setMaxBulkVolume(long bulkSize); + + long getMaxBulkVolume(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java deleted file mode 100644 index 8483722..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.xbib.elx.api; - -import org.elasticsearch.action.bulk.BulkRequest; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public interface BulkRequestHandler { - - void execute(BulkRequest bulkRequest); - - boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException; - - int getPermits(); - - void increase(); - - void reduce(); -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 422356d..ddedaeb 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -45,7 +45,6 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void setClient(ElasticsearchClient client) { - logger.log(Level.DEBUG, "setting client = " + client); this.client = client; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 9b5c0a7..618386d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -1,7 +1,5 @@ package org.xbib.elx.common; -import com.google.common.base.Charsets; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; @@ -20,7 +18,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.xbib.elx.api.BulkClient; -import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -32,7 +30,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName()); - private BulkController bulkController; + private BulkProcessor bulkProcessor; private final AtomicBoolean closed = new AtomicBoolean(true); @@ -40,21 +38,19 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void init(Settings settings) throws IOException { if (closed.compareAndSet(true, false)) { super.init(settings); - bulkController = new DefaultBulkController(this); - logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); - bulkController.init(settings); + bulkProcessor = new DefaultBulkProcessor(this, settings); } } @Override - public BulkController getBulkController() { - return bulkController; + public BulkProcessor getBulkController() { + return bulkProcessor; } @Override public void flush() throws IOException { - if (bulkController != null) { - bulkController.flush(); + if (bulkProcessor != null) { + bulkProcessor.flush(); } } @@ -62,9 +58,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void close() throws IOException { if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); - if (bulkController != null) { - logger.info("closing bulk controller"); - bulkController.close(); + if (bulkProcessor != null) { + logger.info("closing bulk"); + bulkProcessor.close(); } closeClient(settings); } @@ -125,7 +121,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements return; } ensureClientIsPresent(); - bulkController.startBulkMode(indexDefinition); + bulkProcessor.startBulkMode(indexDefinition); } @Override @@ -134,12 +130,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements return; } ensureClientIsPresent(); - bulkController.stopBulkMode(indexDefinition); + bulkProcessor.stopBulkMode(indexDefinition); } @Override public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source) { - return index(indexDefinition, id, create, new BytesArray(source.getBytes(Charsets.UTF_8))); + return index(indexDefinition, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); } @Override @@ -156,7 +152,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexRequest indexRequest) { ensureClientIsPresent(); - bulkController.bulkIndex(indexRequest); + bulkProcessor.add(indexRequest); return this; } @@ -174,7 +170,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(DeleteRequest deleteRequest) { ensureClientIsPresent(); - bulkController.bulkDelete(deleteRequest); + bulkProcessor.add(deleteRequest); return this; } @@ -197,14 +193,14 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient update(UpdateRequest updateRequest) { ensureClientIsPresent(); - bulkController.bulkUpdate(updateRequest); + bulkProcessor.add(updateRequest); return this; } @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); - return bulkController.waitForBulkResponses(timeout, timeUnit); + return bulkProcessor.waitForBulkResponses(timeout, timeUnit); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java deleted file mode 100644 index 1e1b34b..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ /dev/null @@ -1,211 +0,0 @@ -package org.xbib.elx.common; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.xbib.elx.api.BulkClient; -import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkMetric; -import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.IndexDefinition; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -public class DefaultBulkController implements BulkController { - - private static final Logger logger = LogManager.getLogger(DefaultBulkController.class); - - private final BulkClient bulkClient; - - private BulkProcessor bulkProcessor; - - private DefaultBulkListener bulkListener; - - private long maxWaitTime; - - private TimeUnit maxWaitTimeUnit; - - private final AtomicBoolean active; - - public DefaultBulkController(BulkClient bulkClient) { - this.bulkClient = bulkClient; - this.active = new AtomicBoolean(false); - } - - @Override - public void init(Settings settings) { - String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), - Parameters.MAX_WAIT_BULK_RESPONSE.getString()); - TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr, - TimeValue.timeValueSeconds(30), ""); - this.maxWaitTime = maxWaitTimeValue.seconds(); - this.maxWaitTimeUnit = TimeUnit.SECONDS; - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), - Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), - Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); - String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), - Parameters.FLUSH_INTERVAL.getString()); - TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, - TimeValue.timeValueSeconds(30), ""); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), - Parameters.ENABLE_BULK_LOGGING.getBoolean()); - boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), - Parameters.FAIL_ON_BULK_ERROR.getBoolean()); - int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), - Parameters.RESPONSE_TIME_COUNT.getInteger()); - this.bulkListener = new DefaultBulkListener(this, - enableBulkLogging, failOnBulkError, responseTimeCount); - this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) - .setBulkActions(maxActionsPerRequest) - .setConcurrentRequests(maxConcurrentRequests) - .setFlushInterval(flushIngestInterval) - .setBulkSize(maxVolumePerRequest) - .build(); - this.active.set(true); - if (logger.isInfoEnabled()) { - logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushIngestInterval = {} maxVolumePerRequest = {} " + - "bulk logging = {} fail on bulk error = {} " + - "logger debug = {} from settings = {}", - maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests, - flushIngestInterval, maxVolumePerRequest, - enableBulkLogging, failOnBulkError, - logger.isDebugEnabled(), settings.toDelimitedString(',')); - } - } - - @Override - public BulkProcessor getBulkProcessor() { - return bulkProcessor; - } - - @Override - public BulkMetric getBulkMetric() { - return bulkListener != null ? bulkListener.getBulkMetric() : null; - } - - @Override - public Throwable getLastBulkError() { - return bulkListener != null ? bulkListener.getLastBulkError() : null; - } - - @Override - public void inactivate() { - this.active.set(false); - } - - @Override - public void startBulkMode(IndexDefinition indexDefinition) throws IOException { - String indexName = indexDefinition.getFullIndexName(); - if (indexDefinition.getStartBulkRefreshSeconds() != 0) { - bulkClient.updateIndexSetting(indexName, "refresh_interval", - indexDefinition.getStartBulkRefreshSeconds() + "s", - 30L, TimeUnit.SECONDS); - } - } - - @Override - public void bulkIndex(IndexRequest indexRequest) { - ensureActiveAndBulk(); - try { - bulkProcessor.add(indexRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of index failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public void bulkDelete(DeleteRequest deleteRequest) { - ensureActiveAndBulk(); - try { - bulkProcessor.add(deleteRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of delete failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public void bulkUpdate(UpdateRequest updateRequest) { - ensureActiveAndBulk(); - try { - bulkProcessor.add(updateRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of update failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { - try { - if (bulkProcessor != null) { - bulkProcessor.flush(); - return bulkProcessor.awaitFlush(timeout, timeUnit); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("interrupted"); - return false; - } catch (IOException e) { - logger.error(e.getMessage(), e); - return false; - } - return false; - } - - @Override - public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { - flush(); - if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { - if (indexDefinition.getStopBulkRefreshSeconds() != 0) { - bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(), - "refresh_interval", - indexDefinition.getStopBulkRefreshSeconds() + "s", - 30L, TimeUnit.SECONDS); - } - } - } - - @Override - public void flush() throws IOException { - if (bulkProcessor != null) { - bulkProcessor.flush(); - } - } - - @Override - public void close() throws IOException { - flush(); - bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); - if (bulkProcessor != null) { - bulkProcessor.close(); - } - } - - private void ensureActiveAndBulk() { - if (!active.get()) { - throw new IllegalStateException("inactive"); - } - if (bulkProcessor == null) { - throw new UnsupportedOperationException("bulk processor not present"); - } - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index dd892f7..465d4e5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -5,21 +5,19 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; +import org.xbib.elx.api.BulkProcessor; import java.io.IOException; -import java.util.Arrays; import java.util.LongSummaryStatistics; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.LongStream; +import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultBulkListener implements BulkListener { private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName()); - private final BulkController bulkController; + private final BulkProcessor bulkProcessor; private final BulkMetric bulkMetric; @@ -29,20 +27,15 @@ public class DefaultBulkListener implements BulkListener { private Throwable lastBulkError; - private final int ringBufferSize; - - private final LongRingBuffer ringBuffer; - - public DefaultBulkListener(BulkController bulkController, + public DefaultBulkListener(BulkProcessor bulkProcessor, + ScheduledThreadPoolExecutor scheduler, boolean isBulkLoggingEnabled, boolean failOnError, int ringBufferSize) { - this.bulkController = bulkController; + this.bulkProcessor = bulkProcessor; this.isBulkLoggingEnabled = isBulkLoggingEnabled; this.failOnError = failOnError; - this.ringBufferSize = ringBufferSize; - this.ringBuffer = new LongRingBuffer(ringBufferSize); - this.bulkMetric = new DefaultBulkMetric(); + this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, ringBufferSize); bulkMetric.start(); } @@ -59,7 +52,7 @@ public class DefaultBulkListener implements BulkListener { bulkMetric.getCurrentIngestNumDocs().inc(n); bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", + logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", executionId, request.numberOfActions(), request.estimatedSizeInBytes(), @@ -69,22 +62,10 @@ public class DefaultBulkListener implements BulkListener { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + bulkMetric.recalculate(request, response); long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); - if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) { - LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics(); - LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics(); - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("bulk response millis: avg = " + stat1.getAverage() + - " min = " + stat1.getMin() + - " max = " + stat1.getMax() + - " size: avg = " + stat2.getAverage() + - " min = " + stat2.getMin() + - " max = " + stat2.getMax() + - " throughput: " + (stat2.getAverage() / stat1.getAverage()) + " bytes/ms"); - } - } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); @@ -95,7 +76,7 @@ public class DefaultBulkListener implements BulkListener { } } if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]", + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", executionId, bulkMetric.getSucceeded().getCount(), bulkMetric.getFailed().getCount(), @@ -123,7 +104,7 @@ public class DefaultBulkListener implements BulkListener { if (logger.isErrorEnabled()) { logger.error("after bulk [" + executionId + "] error", failure); } - bulkController.inactivate(); + bulkProcessor.setEnabled(false); } @Override @@ -135,37 +116,4 @@ public class DefaultBulkListener implements BulkListener { public void close() throws IOException { bulkMetric.close(); } - - private static class LongRingBuffer { - - private final Long[] values1, values2; - - private final int limit; - - private final AtomicInteger index; - - public LongRingBuffer(int limit) { - this.values1 = new Long[limit]; - this.values2 = new Long[limit]; - Arrays.fill(values1, -1L); - Arrays.fill(values2, -1L); - this.limit = limit; - this.index = new AtomicInteger(); - } - - public int add(Long v1, Long v2) { - int i = index.incrementAndGet() % limit; - values1[i] = v1; - values2[i] = v2; - return i; - } - - public LongStream longStreamValues1() { - return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue); - } - - public LongStream longStreamValues2() { - return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue); - } - } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 7b2987f..3c62ed0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,15 +1,25 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.xbib.elx.api.BulkMetric; +import org.xbib.elx.api.BulkProcessor; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; -import java.util.concurrent.Executors; +import java.util.LongSummaryStatistics; +import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultBulkMetric implements BulkMetric { + private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName()); + + private final BulkProcessor bulkProcessor; + private final Meter totalIngest; private final Count totalIngestSizeInBytes; @@ -28,14 +38,31 @@ public class DefaultBulkMetric implements BulkMetric { private Long stopped; - public DefaultBulkMetric() { - totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor()); - totalIngestSizeInBytes = new CountMetric(); - currentIngest = new CountMetric(); - currentIngestNumDocs = new CountMetric(); - submitted = new CountMetric(); - succeeded = new CountMetric(); - failed = new CountMetric(); + private final int ringBufferSize; + + private final LongRingBuffer ringBuffer; + + private Double lastThroughput; + + private long currentMaxVolume; + + private int currentPermits; + + public DefaultBulkMetric(BulkProcessor bulkProcessor, + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + int ringBufferSize) { + this.bulkProcessor = bulkProcessor; + this.totalIngest = new Meter(scheduledThreadPoolExecutor); + this.ringBufferSize = ringBufferSize; + this.ringBuffer = new LongRingBuffer(ringBufferSize); + this.totalIngestSizeInBytes = new CountMetric(); + this.currentIngest = new CountMetric(); + this.currentIngestNumDocs = new CountMetric(); + this.submitted = new CountMetric(); + this.succeeded = new CountMetric(); + this.failed = new CountMetric(); + this.currentMaxVolume = 1024; + this.currentPermits = 1; } @Override @@ -100,4 +127,56 @@ public class DefaultBulkMetric implements BulkMetric { stop(); totalIngest.shutdown(); } + + private int x = 0; + + @Override + public void recalculate(BulkRequest request, BulkResponse response) { + if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) { + x++; + LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics(); + LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics(); + double throughput = stat2.getAverage() / stat1.getAverage(); + double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d; + if (logger.isDebugEnabled()) { + logger.debug("metric: avg = " + stat1.getAverage() + + " min = " + stat1.getMin() + + " max = " + stat1.getMax() + + " size: avg = " + stat2.getAverage() + + " min = " + stat2.getMin() + + " max = " + stat2.getMax() + + " last throughput: " + lastThroughput + " bytes/ms" + + " throughput: " + throughput + " bytes/ms" + + " delta = " + delta + + " vol = " + currentMaxVolume); + } + if (lastThroughput == null || throughput < 10000) { + double k = 0.5; + double d = (1 / (1 + Math.exp(-(((double)x)) * k))); + logger.debug("inc: x = " + x + " d = " + d); + currentMaxVolume += d * currentMaxVolume; + if (currentMaxVolume > 5 + 1024 * 1024) { + currentMaxVolume = 5 * 1024 * 1024; + } + bulkProcessor.setMaxBulkVolume(currentMaxVolume); + if (logger.isDebugEnabled()) { + logger.debug("metric: increase volume to " + currentMaxVolume); + } + } else if (delta < -100) { + double k = 0.5; + double d = (1 / (1 + Math.exp(-(((double)x)) * k))); + d = -1/d; + logger.debug("dec: x = " + x + " d = " + d); + currentMaxVolume += d * currentMaxVolume; + if (currentMaxVolume > 5 + 1024 * 1024) { + currentMaxVolume = 5 * 1024 * 1024; + } + bulkProcessor.setMaxBulkVolume(currentMaxVolume); + if (logger.isDebugEnabled()) { + logger.debug("metric: decrease volume to " + currentMaxVolume); + } + } + lastThroughput = throughput; + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index c6eea96..5199cd4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -1,26 +1,29 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.xbib.elx.api.BulkListener; +import org.xbib.elx.api.BulkClient; +import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.BulkRequestHandler; +import org.xbib.elx.api.IndexDefinition; import java.io.IOException; -import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -29,343 +32,238 @@ import java.util.concurrent.atomic.AtomicLong; * (either based on number of actions, based on the size, or time), and * to easily control the number of concurrent bulk * requests allowed to be executed in parallel. - * In order to create a new bulk processor, use the {@link Builder}. */ public class DefaultBulkProcessor implements BulkProcessor { + private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); + + private final BulkClient bulkClient; + + private final AtomicBoolean enabled; + + private final ElasticsearchClient client; + + private final DefaultBulkListener bulkListener; + private final ScheduledThreadPoolExecutor scheduler; - private final ScheduledFuture scheduledFuture; - - private final BulkRequestHandler bulkRequestHandler; + private ScheduledFuture scheduledFuture; private BulkRequest bulkRequest; - private long bulkSize; + private long maxBulkVolume; - private int bulkActions; + private int maxBulkActions; - private volatile boolean closed; + private final AtomicBoolean closed; - private DefaultBulkProcessor(ElasticsearchClient client, - BulkListener bulkListener, - String name, - int concurrentRequests, - int bulkActions, - ByteSizeValue bulkSize, - TimeValue flushInterval) { - this.closed = false; - this.bulkActions = bulkActions; - this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler = concurrentRequests == 0 ? - new SyncBulkRequestHandler(client, bulkListener) : - new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests); - if (flushInterval != null) { - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, - EsExecutors.daemonThreadFactory(name != null ? "[" + name + "]" : "" + "bulk_processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), + private final AtomicLong executionIdGen; + + private ResizeableSemaphore semaphore; + + private int permits; + + public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { + this.bulkClient = bulkClient; + int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), + Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); + int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), + Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); + String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), + Parameters.FLUSH_INTERVAL.getString()); + TimeValue flushInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, + TimeValue.timeValueSeconds(30), ""); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), + Parameters.ENABLE_BULK_LOGGING.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), + Parameters.FAIL_ON_BULK_ERROR.getBoolean()); + int ringBufferSize = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), + Parameters.RESPONSE_TIME_COUNT.getInteger()); + this.client = bulkClient.getClient(); + this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, + EsExecutors.daemonThreadFactory("elx-bulk-processor")); + this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + if (flushInterval.millis() > 0L) { + this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this::flush, flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); + } + this.bulkListener = new DefaultBulkListener(this, scheduler, + enableBulkLogging, failOnBulkError, ringBufferSize); + this.permits = maxConcurrentRequests; + this.maxBulkActions = maxActionsPerRequest; + this.maxBulkVolume = maxVolumePerRequest != null ? maxVolumePerRequest.getBytes() : -1; + this.bulkRequest = new BulkRequest(); + this.closed = new AtomicBoolean(false); + this.enabled = new AtomicBoolean(false); + this.executionIdGen = new AtomicLong(); + if (permits > 0) { + this.semaphore = new ResizeableSemaphore(permits); + } + if (logger.isInfoEnabled()) { + logger.info("bulk processor now active with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + "flushInterval = {} maxVolumePerRequest = {} " + + "bulk logging = {} fail on bulk error = {} " + + "logger debug = {} from settings = {}", + maxActionsPerRequest, maxConcurrentRequests, + flushInterval, maxVolumePerRequest, + enableBulkLogging, failOnBulkError, + logger.isDebugEnabled(), settings.toDelimitedString(',')); + } + setEnabled(true); + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled.set(enabled); + } + + @Override + public void startBulkMode(IndexDefinition indexDefinition) throws IOException { + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStartBulkRefreshSeconds(); + if (interval != 0) { + logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); + bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); } else { - this.scheduler = null; - this.scheduledFuture = null; + logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } } - public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) { - Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null"); - return new Builder(client, bulkListener); + @Override + public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStopBulkRefreshSeconds(); + flush(); + if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { + if (interval != 0) { + logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); + bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); + } + } } @Override - public void setBulkActions(int bulkActions) { - this.bulkActions = bulkActions; + public void setMaxBulkActions(int bulkActions) { + this.maxBulkActions = bulkActions; } @Override - public int getBulkActions() { - return bulkActions; + public int getMaxBulkActions() { + return maxBulkActions; } @Override - public void setBulkSize(long bulkSize) { - this.bulkSize = bulkSize; + public void setMaxBulkVolume(long bulkSize) { + this.maxBulkVolume = bulkSize; } @Override - public long getBulkSize() { - return bulkSize; + public long getMaxBulkVolume() { + return maxBulkVolume; } - public BulkRequestHandler getBulkRequestHandler() { - return bulkRequestHandler; + @Override + public BulkMetric getBulkMetric() { + return bulkListener.getBulkMetric(); + } + + @Override + public Throwable getLastBulkError() { + return bulkListener.getLastBulkError(); } @Override public synchronized void add(ActionRequest request) { - ensureOpen(); + ensureOpenAndActive(); bulkRequest.add(request); - if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) || - (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) { + if ((maxBulkActions != -1 && bulkRequest.numberOfActions() >= maxBulkActions) || + (maxBulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= maxBulkVolume)) { execute(); } } @Override public synchronized void flush() { - ensureOpen(); + ensureOpenAndActive(); if (bulkRequest.numberOfActions() > 0) { execute(); } + // do not drain semaphore } @Override - public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException { - if (closed) { - return true; - } - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - return bulkRequestHandler.flush(timeout, unit); - } - - @Override - public synchronized void close() throws IOException { + public synchronized boolean waitForBulkResponses(long timeout, TimeUnit unit) { try { - if (closed) { - return; - } - closed = true; - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - scheduler.shutdown(); + if (closed.get()) { + // silently skip closed condition + return true; } if (bulkRequest.numberOfActions() > 0) { execute(); } - bulkRequestHandler.flush(0, TimeUnit.NANOSECONDS); - } catch (InterruptedException exc) { + return drainSemaphore(timeout, unit); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); + logger.error("interrupted while waiting for bulk responses"); + return false; } } - private void ensureOpen() { - if (closed) { - throw new IllegalStateException("bulk processor already closed"); + @Override + public synchronized void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + if (scheduler != null) { + scheduler.shutdown(); + } + // like flush but without ensuring open + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + drainSemaphore(0L, TimeUnit.NANOSECONDS); + bulkListener.close(); + } catch (InterruptedException exc) { + Thread.currentThread().interrupt(); + } } } private void execute() { BulkRequest myBulkRequest = this.bulkRequest; this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler.execute(myBulkRequest); - } - - /** - * A builder used to create a build an instance of a bulk processor. - */ - public static class Builder { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - private String name; - - private int concurrentRequests = 1; - - private int bulkActions = 1000; - - private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB); - - private TimeValue flushInterval = null; - - /** - * Creates a builder of bulk processor with the client to use and the listener that will be used - * to be notified on the completion of bulk requests. - * - * @param client the client - * @param bulkListener the listener - */ - Builder(ElasticsearchClient client, BulkListener bulkListener) { - this.client = client; - this.bulkListener = bulkListener; - } - - /** - * Sets an optional name to identify this bulk processor. - * - * @param name name - * @return this builder - */ - public Builder setName(String name) { - this.name = name; - return this; - } - - /** - * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single - * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed - * while accumulating new bulk requests. Defaults to {@code 1}. - * - * @param concurrentRequests maximum number of concurrent requests - * @return this builder - */ - public Builder setConcurrentRequests(int concurrentRequests) { - this.concurrentRequests = concurrentRequests; - return this; - } - - /** - * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to - * {@code 1000}. Can be set to {@code -1} to disable it. - * - * @param bulkActions bulk actions - * @return this builder - */ - public Builder setBulkActions(int bulkActions) { - this.bulkActions = bulkActions; - return this; - } - - /** - * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to - * {@code 1mb}. Can be set to {@code -1} to disable it. - * - * @param bulkSize bulk size - * @return this builder - */ - public Builder setBulkSize(ByteSizeValue bulkSize) { - this.bulkSize = bulkSize; - return this; - } - - /** - * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set. - * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)} - * can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions. - * - * @param flushInterval flush interval - * @return this builder - */ - public Builder setFlushInterval(TimeValue flushInterval) { - this.flushInterval = flushInterval; - return this; - } - - /** - * Builds a new bulk processor. - * - * @return a bulk processor - */ - public DefaultBulkProcessor build() { - return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); - } - } - - private class Flush implements Runnable { - - @Override - public void run() { - synchronized (DefaultBulkProcessor.this) { - if (closed) { - return; - } - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - } - } - } - - private static class SyncBulkRequestHandler implements BulkRequestHandler { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - private final AtomicLong executionIdGen; - - SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) { - this.client = client; - this.bulkListener = bulkListener; - this.executionIdGen = new AtomicLong(); - } - - @Override - public void execute(BulkRequest bulkRequest) { - long executionId = executionIdGen.incrementAndGet(); + long executionId = executionIdGen.incrementAndGet(); + if (semaphore == null) { boolean afterCalled = false; try { - bulkListener.beforeBulk(executionId, bulkRequest); - BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + bulkListener.beforeBulk(executionId, myBulkRequest); + BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, myBulkRequest).actionGet(); afterCalled = true; - bulkListener.afterBulk(executionId, bulkRequest, bulkResponse); + bulkListener.afterBulk(executionId, myBulkRequest, bulkResponse); } catch (Exception e) { if (!afterCalled) { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } } - } - - @Override - public boolean flush(long timeout, TimeUnit unit) { - return true; - } - - @Override - public int getPermits() { - return 1; - } - - @Override - public void increase() { - } - - @Override - public void reduce() { - } - } - - private static class AsyncBulkRequestHandler implements BulkRequestHandler { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - private final ResizeableSemaphore semaphore; - - private final AtomicLong executionIdGen; - - private int permits; - - private AsyncBulkRequestHandler(ElasticsearchClient client, - BulkListener bulkListener, - int permits) { - this.client = client; - this.bulkListener = bulkListener; - this.permits = permits; - this.semaphore = new ResizeableSemaphore(permits); - this.executionIdGen = new AtomicLong(); - } - - @Override - public void execute(BulkRequest bulkRequest) { + } else { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; - long executionId = executionIdGen.incrementAndGet(); try { - bulkListener.beforeBulk(executionId, bulkRequest); + bulkListener.beforeBulk(executionId, myBulkRequest); semaphore.acquire(); acquired = true; - client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { + client.execute(BulkAction.INSTANCE, myBulkRequest, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { try { - bulkListener.afterBulk(executionId, bulkRequest, response); + bulkListener.afterBulk(executionId, myBulkRequest, response); } finally { semaphore.release(); } @@ -374,7 +272,7 @@ public class DefaultBulkProcessor implements BulkProcessor { @Override public void onFailure(Throwable e) { try { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } finally { semaphore.release(); } @@ -383,45 +281,36 @@ public class DefaultBulkProcessor implements BulkProcessor { bulkRequestSetupSuccessful = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } catch (Exception e) { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } finally { if (!bulkRequestSetupSuccessful && acquired) { semaphore.release(); } } } + } - @Override - public boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException { - bulkListener.close(); - if (semaphore.tryAcquire(permits, timeout, unit)) { + private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException { + if (semaphore != null) { + if (semaphore.tryAcquire(permits, timeValue, timeUnit)) { semaphore.release(permits); return true; } - return false; } + return false; + } - @Override - public int getPermits() { - return permits; + private void ensureOpenAndActive() { + if (closed.get()) { + throw new IllegalStateException("bulk processor is closed"); } - - @Override - public void increase() { - semaphore.release(1); - this.permits++; - } - - @Override - public void reduce() { - semaphore.reducePermits(1); - this.permits--; + if (!enabled.get()) { + throw new IllegalStateException("bulk processor is no longer enabled"); } } - @SuppressWarnings("serial") private static class ResizeableSemaphore extends Semaphore { ResizeableSemaphore(int permits) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 352b38c..ada8db9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -65,7 +65,7 @@ public class DefaultIndexDefinition implements IndexDefinition { setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); - setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS); + setMaxWaitTime(30, TimeUnit.SECONDS); setShift(false); setPrune(false); setEnabled(true); @@ -73,7 +73,9 @@ public class DefaultIndexDefinition implements IndexDefinition { public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30)); + String timeValueStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), + Parameters.MAX_WAIT_BULK_RESPONSE.getString()); + TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), ""); setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); String indexType = settings.get("type", type); @@ -83,11 +85,13 @@ public class DefaultIndexDefinition implements IndexDefinition { setEnabled(enabled); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); + setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), + Parameters.START_BULK_REFRESH_SECONDS.getInteger())); + setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), + Parameters.STOP_BULK_REFRESH_SECONDS.getInteger())); if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); - setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)); - setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); setReplicaLevel(settings.getAsInt("replica", 0)); boolean shift = settings.getAsBoolean("shift", false); setShift(shift); diff --git a/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java b/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java new file mode 100644 index 0000000..91aba29 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java @@ -0,0 +1,38 @@ +package org.xbib.elx.common; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.LongStream; + +public class LongRingBuffer { + + private final Long[] values1, values2; + + private final int limit; + + private final AtomicInteger index; + + public LongRingBuffer(int limit) { + this.values1 = new Long[limit]; + this.values2 = new Long[limit]; + Arrays.fill(values1, -1L); + Arrays.fill(values2, -1L); + this.limit = limit; + this.index = new AtomicInteger(); + } + + public int add(Long v1, Long v2) { + int i = index.incrementAndGet() % limit; + values1[i] = v1; + values2[i] = v2; + return i; + } + + public LongStream longStreamValues1() { + return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue); + } + + public LongStream longStreamValues2() { + return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue); + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 0dabaf9..34c3091 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -6,9 +6,7 @@ public enum Parameters { MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"), - MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), - - START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), + START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), @@ -16,14 +14,13 @@ public enum Parameters { FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true), - MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), + MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), - RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64), + RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 16), - // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests - MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), + MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, 1 /*Runtime.getRuntime().availableProcessors() - 1*/), - MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"), + MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1kb"), FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"); diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 6e429f7..46751fa 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -36,10 +36,10 @@ public class NodeClientHelper { key -> innerCreateClient(settings)); } - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); if (client != null) { - logger.debug("closing node..."); + logger.debug("closing node"); node.close(); node = null; } @@ -77,10 +77,12 @@ public class NodeClientHelper { } private static boolean isPrivateSettings(String key) { - return key.equals(Parameters.MAX_ACTIONS_PER_REQUEST.name()) || - key.equals(Parameters.MAX_CONCURRENT_REQUESTS.name()) || - key.equals(Parameters.MAX_VOLUME_PER_REQUEST.name()) || - key.equals(Parameters.FLUSH_INTERVAL.name()); + for (Parameters p : Parameters.values()) { + if (key.equals(p.getName())) { + return true; + } + } + return false; } private static class BulkNode extends Node { diff --git a/elx-node/src/main/java/org/xbib/elx/node/package-info.java b/elx-node/src/main/java/org/xbib/elx/node/package-info.java deleted file mode 100644 index c2a9dfb..0000000 --- a/elx-node/src/main/java/org/xbib/elx/node/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Node client extensions. - */ -package org.xbib.elx.node; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 4038590..e376e86 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -9,7 +9,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; @@ -31,8 +30,6 @@ class BulkClientTest { private static final Long ACTIONS = 10000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; - private final TestExtension.Helper helper; BulkClientTest(TestExtension.Helper helper) { @@ -44,8 +41,6 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -65,7 +60,6 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.FLUSH_INTERVAL.getName(), "5s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -105,8 +99,6 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -130,14 +122,11 @@ class BulkClientTest { @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setStartBulkRefreshSeconds(0); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 641c40d..e5235aa 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -9,7 +9,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; @@ -31,8 +30,6 @@ class BulkClientTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; - private final TestExtension.Helper helper; BulkClientTest(TestExtension.Helper helper) { @@ -44,8 +41,6 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -104,8 +99,6 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -128,23 +121,19 @@ class BulkClientTest { @Test void testThreadedRandomDocs() { int maxthreads = Runtime.getRuntime().availableProcessors(); - long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + //long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; long timeout = 120L; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") - .put(Parameters.ENABLE_BULK_LOGGING.getName(), Boolean.TRUE) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test"); - indexDefinition.setStartBulkRefreshSeconds(0); + indexDefinition.setStartBulkRefreshSeconds(-1); indexDefinition.setStopBulkRefreshSeconds(60); bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); - logger.info("index created"); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { diff --git a/gradle.properties b/gradle.properties index d0cb581..02d6610 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.36 +version = 2.2.1.37 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0