diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 7c96b35..cd0bee8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -4,10 +4,13 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { + void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; + /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. * @param client the Elasticsearch client @@ -59,4 +62,6 @@ public interface BasicClient extends Closeable { long getSearchableDocs(IndexDefinition index); boolean isIndexExists(IndexDefinition index); + + ScheduledThreadPoolExecutor getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index f4508b0..b313483 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -5,6 +5,7 @@ import org.elasticsearch.action.ActionRequest; import java.io.Closeable; import java.io.Flushable; import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { diff --git a/elx-common/build.gradle b/elx-common/build.gradle index a89f8f3..a1f397f 100644 --- a/elx-common/build.gradle +++ b/elx-common/build.gradle @@ -1,6 +1,7 @@ dependencies { api project(':elx-api') implementation "org.xbib:guice:${project.property('xbib-guice.version')}" + implementation "org.xbib:time:${project.property('xbib-time.version')}" runtimeOnly "com.vividsolutions:jts:${project.property('jts.version')}" runtimeOnly "com.github.spullara.mustache.java:compiler:${project.property('mustache.version')}" runtimeOnly "net.java.dev.jna:jna:${project.property('jna.version')}" diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index d8db967..f188f83 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; @@ -83,7 +82,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public Map getMapping(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return null; } GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) @@ -98,7 +97,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return null; } ensureClientIsPresent(); @@ -115,7 +114,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return null; } if (level < 1) { @@ -131,7 +130,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public int getReplicaLevel(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return -1; } ensureClientIsPresent(); @@ -203,7 +202,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (additionalAliases == null) { return new EmptyIndexShiftResult(); } - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return new EmptyIndexShiftResult(); } if (indexDefinition.isShiftEnabled()) { @@ -358,7 +357,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return -1L; } ensureClientIsPresent(); @@ -385,7 +384,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public boolean forceMerge(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return false; } if (!indexDefinition.isForceMergeEnabled()) { @@ -425,7 +424,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public void checkMapping(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index ddedaeb..a00998b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -7,6 +7,12 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -22,10 +28,15 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolInfo; import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,12 +48,23 @@ public abstract class AbstractBasicClient implements BasicClient { protected Settings settings; + private final ScheduledThreadPoolExecutor scheduler; + private final AtomicBoolean closed; public AbstractBasicClient() { + this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, + EsExecutors.daemonThreadFactory("elx-bulk-processor")); + this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); closed = new AtomicBoolean(false); } + @Override + public ScheduledThreadPoolExecutor getScheduler() { + return scheduler; + } + @Override public void setClient(ElasticsearchClient client) { this.client = client; @@ -82,6 +104,39 @@ public abstract class AbstractBasicClient implements BasicClient { } } + @Override + public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + ensureClientIsPresent(); + if (key == null) { + throw new IOException("no key given"); + } + if (value == null) { + throw new IOException("no value given"); + } + Settings.Builder updateSettingsBuilder = Settings.builder(); + updateSettingsBuilder.put(key, value.toString()); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); + client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + } + + protected Long getThreadPoolQueueSize(String name) { + ensureClientIsPresent(); + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.threadPool(true); + NodesInfoResponse nodesInfoResponse = + client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { + ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool(); + for (ThreadPool.Info info : threadPoolInfo) { + if (info.getName().equals(name)) { + return info.getQueueSize().getSingles(); + } + } + } + return null; + } + @Override public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); @@ -139,7 +194,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public long getSearchableDocs(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return -1L; } ensureClientIsPresent(); @@ -166,6 +221,9 @@ public abstract class AbstractBasicClient implements BasicClient { ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { closeClient(settings); + if (scheduler != null) { + scheduler.shutdown(); + } } } @@ -197,12 +255,12 @@ public abstract class AbstractBasicClient implements BasicClient { } } - protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) { + protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) { if (!indexDefinition.isEnabled()) { logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); - return false; + return true; } - return true; + return false; } protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 618386d..3df6d20 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -32,7 +32,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private BulkProcessor bulkProcessor; - private final AtomicBoolean closed = new AtomicBoolean(true); + private final AtomicBoolean closed; + + public AbstractBulkClient() { + super(); + closed = new AtomicBoolean(true); + } @Override public void init(Settings settings) throws IOException { @@ -59,16 +64,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); if (bulkProcessor != null) { - logger.info("closing bulk"); + logger.info("closing bulk procesor"); bulkProcessor.close(); } closeClient(settings); + super.close(); } } @Override public void newIndex(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } String index = indexDefinition.getFullIndexName(); @@ -117,16 +123,25 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void startBulk(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); + Long bulkQueueSize = getThreadPoolQueueSize("bulk"); + if (bulkQueueSize != null && bulkQueueSize <= 64) { + logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); + bulkQueueSize = bulkQueueSize * 4; + } else { + logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); + bulkQueueSize = 256L; + } + putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); bulkProcessor.startBulkMode(indexDefinition); } @Override public void stopBulk(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); @@ -140,7 +155,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return index(new IndexRequest() @@ -158,7 +173,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(IndexDefinition indexDefinition, String id) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return delete(new DeleteRequest() @@ -181,7 +196,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return update(new UpdateRequest() @@ -210,7 +225,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void flushIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); @@ -219,7 +234,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void refreshIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 999263e..40b472b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -36,6 +36,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement private SearchMetric searchMetric; + private final AtomicBoolean closed; + + public AbstractSearchClient() { + super(); + this.closed = new AtomicBoolean(true); + } + @Override public SearchMetric getSearchMetric() { return searchMetric; @@ -43,16 +50,20 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement @Override public void init(Settings settings) throws IOException { - super.init(settings); - this.searchMetric = new DefaultSearchMetric(); - searchMetric.init(settings); + if (closed.compareAndSet(true, false)) { + super.init(settings); + this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); + searchMetric.init(settings); + } } @Override public void close() throws IOException { - super.close(); - if (searchMetric != null) { - searchMetric.close(); + if (closed.compareAndSet(false, true)) { + if (searchMetric != null) { + searchMetric.close(); + } + super.close(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 465d4e5..178f873 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -5,12 +5,12 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; -import java.util.LongSummaryStatistics; import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultBulkListener implements BulkListener { @@ -27,15 +27,17 @@ public class DefaultBulkListener implements BulkListener { private Throwable lastBulkError; - public DefaultBulkListener(BulkProcessor bulkProcessor, + public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, ScheduledThreadPoolExecutor scheduler, - boolean isBulkLoggingEnabled, - boolean failOnError, - int ringBufferSize) { + Settings settings) { this.bulkProcessor = bulkProcessor; - this.isBulkLoggingEnabled = isBulkLoggingEnabled; - this.failOnError = failOnError; - this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, ringBufferSize); + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), + Parameters.ENABLE_BULK_LOGGING.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), + Parameters.FAIL_ON_BULK_ERROR.getBoolean()); + this.isBulkLoggingEnabled = enableBulkLogging; + this.failOnError = failOnBulkError; + this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); bulkMetric.start(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 3c62ed0..f983a76 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,24 +1,32 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkMetric; -import org.xbib.elx.api.BulkProcessor; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; +import java.io.IOException; import java.util.LongSummaryStatistics; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class DefaultBulkMetric implements BulkMetric { private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName()); - private final BulkProcessor bulkProcessor; + private final DefaultBulkProcessor bulkProcessor; + + private final ScheduledFuture future; private final Meter totalIngest; @@ -34,24 +42,37 @@ public class DefaultBulkMetric implements BulkMetric { private final Count failed; - private Long started; - - private Long stopped; + private final long measureIntervalSeconds; private final int ringBufferSize; private final LongRingBuffer ringBuffer; + private final long minVolumePerRequest; + + private final long maxVolumePerRequest; + + private Long started; + + private Long stopped; + private Double lastThroughput; - private long currentMaxVolume; + private long currentVolumePerRequest; - private int currentPermits; + private int x = 0; - public DefaultBulkMetric(BulkProcessor bulkProcessor, + public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, - int ringBufferSize) { + Settings settings) { this.bulkProcessor = bulkProcessor; + int ringBufferSize = settings.getAsInt(Parameters.RING_BUFFER_SIZE.getName(), + Parameters.RING_BUFFER_SIZE.getInteger()); + String measureIntervalStr = settings.get(Parameters.MEASURE_INTERVAL.getName(), + Parameters.MEASURE_INTERVAL.getString()); + TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, + TimeValue.timeValueSeconds(1), ""); + this.measureIntervalSeconds = measureInterval.seconds(); this.totalIngest = new Meter(scheduledThreadPoolExecutor); this.ringBufferSize = ringBufferSize; this.ringBuffer = new LongRingBuffer(ringBufferSize); @@ -61,8 +82,18 @@ public class DefaultBulkMetric implements BulkMetric { this.submitted = new CountMetric(); this.succeeded = new CountMetric(); this.failed = new CountMetric(); - this.currentMaxVolume = 1024; - this.currentPermits = 1; + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k")); + this.minVolumePerRequest = minVolumePerRequest.bytes(); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); + this.maxVolumePerRequest = maxVolumePerRequest.bytes(); + this.currentVolumePerRequest = minVolumePerRequest.bytes(); + String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(), + Parameters.METRIC_LOG_INTERVAL.getString()); + TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, + TimeValue.timeValueSeconds(10), ""); + this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override @@ -113,7 +144,7 @@ public class DefaultBulkMetric implements BulkMetric { @Override public void start() { this.started = System.nanoTime(); - totalIngest.start(5L); + totalIngest.start(measureIntervalSeconds); } @Override @@ -123,13 +154,13 @@ public class DefaultBulkMetric implements BulkMetric { } @Override - public void close() { + public void close() throws IOException { stop(); totalIngest.shutdown(); + log(); + this.future.cancel(true); } - private int x = 0; - @Override public void recalculate(BulkRequest request, BulkResponse response) { if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) { @@ -138,8 +169,9 @@ public class DefaultBulkMetric implements BulkMetric { LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics(); double throughput = stat2.getAverage() / stat1.getAverage(); double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d; + double deltaPercent = delta * 100 / throughput; if (logger.isDebugEnabled()) { - logger.debug("metric: avg = " + stat1.getAverage() + + logger.debug("time: avg = " + stat1.getAverage() + " min = " + stat1.getMin() + " max = " + stat1.getMax() + " size: avg = " + stat2.getAverage() + @@ -148,35 +180,52 @@ public class DefaultBulkMetric implements BulkMetric { " last throughput: " + lastThroughput + " bytes/ms" + " throughput: " + throughput + " bytes/ms" + " delta = " + delta + - " vol = " + currentMaxVolume); + " deltapercent = " + deltaPercent + + " vol = " + currentVolumePerRequest); } - if (lastThroughput == null || throughput < 10000) { + if ((lastThroughput == null || throughput < 100000) && stat1.getMax() < 5000) { double k = 0.5; double d = (1 / (1 + Math.exp(-(((double)x)) * k))); - logger.debug("inc: x = " + x + " d = " + d); - currentMaxVolume += d * currentMaxVolume; - if (currentMaxVolume > 5 + 1024 * 1024) { - currentMaxVolume = 5 * 1024 * 1024; + currentVolumePerRequest += d * currentVolumePerRequest; + if (currentVolumePerRequest > maxVolumePerRequest) { + currentVolumePerRequest = maxVolumePerRequest; } - bulkProcessor.setMaxBulkVolume(currentMaxVolume); + bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); if (logger.isDebugEnabled()) { - logger.debug("metric: increase volume to " + currentMaxVolume); + logger.debug("metric: increase volume to " + currentVolumePerRequest); } - } else if (delta < -100) { - double k = 0.5; - double d = (1 / (1 + Math.exp(-(((double)x)) * k))); - d = -1/d; - logger.debug("dec: x = " + x + " d = " + d); - currentMaxVolume += d * currentMaxVolume; - if (currentMaxVolume > 5 + 1024 * 1024) { - currentMaxVolume = 5 * 1024 * 1024; - } - bulkProcessor.setMaxBulkVolume(currentMaxVolume); + } else if (deltaPercent > 10 || stat1.getMax() >= 5000) { + currentVolumePerRequest = minVolumePerRequest; + bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); if (logger.isDebugEnabled()) { - logger.debug("metric: decrease volume to " + currentMaxVolume); + logger.debug("metric: decrease volume to " + currentVolumePerRequest); } } lastThroughput = throughput; } } + + private void log() { + long docs = getSucceeded().getCount(); + long elapsed = elapsed() / 1000000; // nano to millis + double dps = docs * 1000.0 / elapsed; + long bytes = getTotalIngestSizeInBytes().getCount(); + double avg = bytes / (docs + 1.0); // avoid div by zero + double bps = bytes * 1000.0 / elapsed; + if (logger.isInfoEnabled()) { + logger.log(Level.INFO, "{} docs, {} ms = {}, {} = {}, {} = {} avg, {} = {}, {} = {}", + docs, + elapsed, + FormatUtil.formatDurationWords(elapsed, true, true), + bytes, + FormatUtil.formatSize(bytes), + avg, + FormatUtil.formatSize(avg), + dps, + FormatUtil.formatDocumentSpeed(dps), + bps, + FormatUtil.formatSpeed(bps) + ); + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 5199cd4..1267381 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -11,16 +11,13 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,72 +42,51 @@ public class DefaultBulkProcessor implements BulkProcessor { private final DefaultBulkListener bulkListener; - private final ScheduledThreadPoolExecutor scheduler; - private ScheduledFuture scheduledFuture; private BulkRequest bulkRequest; - private long maxBulkVolume; + private long bulkVolume; - private int maxBulkActions; + private int bulkActions; private final AtomicBoolean closed; private final AtomicLong executionIdGen; - private ResizeableSemaphore semaphore; + private final ResizeableSemaphore semaphore; - private int permits; + private final int permits; public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { this.bulkClient = bulkClient; int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), - Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); - String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), + String flushIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), Parameters.FLUSH_INTERVAL.getString()); - TimeValue flushInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, + TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr, TimeValue.timeValueSeconds(30), ""); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), - Parameters.ENABLE_BULK_LOGGING.getBoolean()); - boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), - Parameters.FAIL_ON_BULK_ERROR.getBoolean()); - int ringBufferSize = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), - Parameters.RESPONSE_TIME_COUNT.getInteger()); + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k")); this.client = bulkClient.getClient(); - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, - EsExecutors.daemonThreadFactory("elx-bulk-processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); if (flushInterval.millis() > 0L) { - this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this::flush, flushInterval.millis(), + this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); } - this.bulkListener = new DefaultBulkListener(this, scheduler, - enableBulkLogging, failOnBulkError, ringBufferSize); - this.permits = maxConcurrentRequests; - this.maxBulkActions = maxActionsPerRequest; - this.maxBulkVolume = maxVolumePerRequest != null ? maxVolumePerRequest.getBytes() : -1; + this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings); + this.bulkActions = maxActionsPerRequest; + this.bulkVolume = minVolumePerRequest.getBytes(); this.bulkRequest = new BulkRequest(); this.closed = new AtomicBoolean(false); this.enabled = new AtomicBoolean(false); this.executionIdGen = new AtomicLong(); - if (permits > 0) { - this.semaphore = new ResizeableSemaphore(permits); + this.permits = settings.getAsInt(Parameters.PERMITS.getName(), Parameters.PERMITS.getInteger()); + if (permits < 1) { + throw new IllegalArgumentException("must not be less 1 permits for bulk indexing"); } + this.semaphore = new ResizeableSemaphore(permits); if (logger.isInfoEnabled()) { - logger.info("bulk processor now active with maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushInterval = {} maxVolumePerRequest = {} " + - "bulk logging = {} fail on bulk error = {} " + - "logger debug = {} from settings = {}", - maxActionsPerRequest, maxConcurrentRequests, - flushInterval, maxVolumePerRequest, - enableBulkLogging, failOnBulkError, - logger.isDebugEnabled(), settings.toDelimitedString(',')); + logger.info("bulk processor now active"); } setEnabled(true); } @@ -149,22 +125,22 @@ public class DefaultBulkProcessor implements BulkProcessor { @Override public void setMaxBulkActions(int bulkActions) { - this.maxBulkActions = bulkActions; + this.bulkActions = bulkActions; } @Override public int getMaxBulkActions() { - return maxBulkActions; + return bulkActions; } @Override public void setMaxBulkVolume(long bulkSize) { - this.maxBulkVolume = bulkSize; + this.bulkVolume = bulkSize; } @Override public long getMaxBulkVolume() { - return maxBulkVolume; + return bulkVolume; } @Override @@ -181,8 +157,8 @@ public class DefaultBulkProcessor implements BulkProcessor { public synchronized void add(ActionRequest request) { ensureOpenAndActive(); bulkRequest.add(request); - if ((maxBulkActions != -1 && bulkRequest.numberOfActions() >= maxBulkActions) || - (maxBulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= maxBulkVolume)) { + if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) || + (bulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= bulkVolume)) { execute(); } } @@ -221,9 +197,6 @@ public class DefaultBulkProcessor implements BulkProcessor { if (scheduledFuture != null) { scheduledFuture.cancel(true); } - if (scheduler != null) { - scheduler.shutdown(); - } // like flush but without ensuring open if (bulkRequest.numberOfActions() > 0) { execute(); @@ -294,9 +267,13 @@ public class DefaultBulkProcessor implements BulkProcessor { private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException { if (semaphore != null) { - if (semaphore.tryAcquire(permits, timeValue, timeUnit)) { - semaphore.release(permits); + if (permits <= 0) { return true; + } else { + if (semaphore.tryAcquire(permits, timeValue, timeUnit)) { + semaphore.release(permits); + return true; + } } } return false; @@ -311,6 +288,7 @@ public class DefaultBulkProcessor implements BulkProcessor { } } + @SuppressWarnings("serial") private static class ResizeableSemaphore extends Semaphore { ResizeableSemaphore(int permits) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index b57eee6..157d514 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -3,17 +3,22 @@ package org.xbib.elx.common; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.SearchMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; -import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class DefaultSearchMetric implements SearchMetric { private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName()); + private final ScheduledFuture future; + private final Meter totalQuery; private final Count currentQuery; @@ -32,19 +37,24 @@ public class DefaultSearchMetric implements SearchMetric { private Long stopped; - public DefaultSearchMetric() { - totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor()); + public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + Settings settings) { + totalQuery = new Meter(scheduledThreadPoolExecutor); currentQuery = new CountMetric(); queries = new CountMetric(); succeededQueries = new CountMetric(); emptyQueries = new CountMetric(); failedQueries = new CountMetric(); timeoutQueries = new CountMetric(); + String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(), + Parameters.METRIC_LOG_INTERVAL.getString()); + TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, + TimeValue.timeValueSeconds(10), ""); + this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override public void init(Settings settings) { - logger.info("init"); start(); } @@ -103,6 +113,8 @@ public class DefaultSearchMetric implements SearchMetric { public void stop() { this.stopped = System.nanoTime(); totalQuery.stop(); + log(); + this.future.cancel(true); } @Override @@ -110,4 +122,10 @@ public class DefaultSearchMetric implements SearchMetric { stop(); totalQuery.shutdown(); } + + private void log() { + if (logger.isInfoEnabled()) { + logger.info("docs = " + getTotalQueries().getCount()); + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java b/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java new file mode 100644 index 0000000..8eac2de --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java @@ -0,0 +1,436 @@ +package org.xbib.elx.common; + +import org.xbib.time.pretty.PrettyTime; +import java.util.ArrayList; +import java.util.List; + +/** + * Taken from org,apache.commons.lang.DurationFormatUtils of Apache commons-lang. + */ +public class FormatUtil { + + private static final PrettyTime pretty = new PrettyTime(); + + private static final String EMPTY = ""; + private static final String YEAR = "y"; + private static final String MONTH = "M"; + private static final String DAY = "d"; + private static final String HOUR = "H"; + private static final String MINUTE = "m"; + private static final String SECOND = "s"; + private static final String MILLISECOND = "S"; + + /** + * Number of milliseconds in a standard second. + */ + private static final long MILLIS_PER_SECOND = 1000; + /** + * Number of milliseconds in a standard minute. + */ + private static final long MILLIS_PER_MINUTE = 60 * MILLIS_PER_SECOND; + /** + * Number of milliseconds in a standard hour. + */ + private static final long MILLIS_PER_HOUR = 60 * MILLIS_PER_MINUTE; + /** + * Number of milliseconds in a standard day. + */ + private static final long MILLIS_PER_DAY = 24 * MILLIS_PER_HOUR; + + private static final String[] BYTES = { + " B", " kB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB" + }; + private static final String[] BYTES_PER_SECOND = { + " B/s", " kB/s", " MB/s", " GB/s", " TB/s", " PB/s", " EB/s", " ZB/s", " YB/s" + }; + private static final String[] DOCS_PER_SECOND = { + " dps", " kdps", " Mdps", " Gdps", " Tdps", " Pdps", " Edps", " Zdps", " Ydps" + }; + + + /** + * Format byte size (file size as example) into a string, + * with two digits after dot and actual measure (MB, GB or other). + * + * @param size value to format + * @return formatted string in bytes, kB, MB or other. + */ + public static String formatSize(long size) { + return format(size, BYTES, 1024); + } + + public static String formatSize(double size) { + return format(size, BYTES, 1024); + } + + /** + * Format speed values (copy speed as example) into a string + * with two digits after dot and actual measure (MB/s, GB/s or other). + * + * @param speed value to format + * @return formatted string in bytes/s, kB/s, MB/s or other. + */ + public static String formatSpeed(long speed) { + return format(speed, BYTES_PER_SECOND, 1024); + } + + public static String formatSpeed(double speed) { + return format(speed, BYTES_PER_SECOND, 1024); + } + + public static String formatDocumentSpeed(long speed) { + return format(speed, DOCS_PER_SECOND, 1024); + } + + public static String formatDocumentSpeed(double speed) { + return format(speed, DOCS_PER_SECOND, 1024); + } + + /** + * Format any value without string appending. + * + * @param size value to format + * @param measureUnits array of strings to use as measurement units. Use BYTES_PER_SECOND as example. + * @param measureQuantity quantiry, required to step into next unit. Like 1024 for bytes, + * 1000 for meters or 100 for century. + * @return formatted size with measure unit + */ + private static String format(long size, String[] measureUnits, int measureQuantity) { + if (size <= 0) { + return null; + } + if (size < measureQuantity) { + return size + measureUnits[0]; + } + int i = 1; + double d = size; + while ((d = d / measureQuantity) > (measureQuantity - 1)) { + i++; + } + long l = (long) (d * 100); + d = (double) l / 100; + if (i < measureUnits.length) { + return d + measureUnits[i]; + } + return String.valueOf(size); + } + + private static String format(double value, String[] measureUnits, int measureQuantity) { + double d = value; + if (d <= 0.0d) { + return null; + } + if (d < measureQuantity) { + return d + measureUnits[0]; + } + int i = 1; + while ((d = d / measureQuantity) > (measureQuantity - 1)) { + i++; + } + long l = (long) (d * 100); + d = (double) l / 100; + if (i < measureUnits.length) { + return d + measureUnits[i]; + } + return String.valueOf(d); + } + + public static String formatMillis(long millis) { + return pretty.format(pretty.calculateDuration(millis)); + } + + public static String formatDurationWords(long value, boolean suppressLeadingZeroElements, + boolean suppressTrailingZeroElements) { + // This method is generally replacable by the format method, but + // there are a series of tweaks and special cases that require + // trickery to replicate. + String duration = formatDuration(value, "d' days 'H' hours 'm' minutes 's' seconds'"); + if (suppressLeadingZeroElements) { + // this is a temporary marker on the front. Like ^ in regexp. + duration = " " + duration; + String tmp = replaceOnce(duration, " 0 days", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 hours", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 minutes", ""); + if (tmp.length() != duration.length()) { + duration = replaceOnce(tmp, " 0 seconds", ""); + } + } + } + if (duration.length() != 0) { + // strip the space off again + duration = duration.substring(1); + } + } + if (suppressTrailingZeroElements) { + String tmp = replaceOnce(duration, " 0 seconds", ""); + if (tmp != null && tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 minutes", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 hours", ""); + if (tmp.length() != duration.length()) { + duration = replaceOnce(tmp, " 0 days", ""); + } + } + } + } + duration = " " + duration; + duration = replaceOnce(duration, " 1 seconds", " 1 second"); + duration = replaceOnce(duration, " 1 minutes", " 1 minute"); + duration = replaceOnce(duration, " 1 hours", " 1 hour"); + duration = replaceOnce(duration, " 1 days", " 1 day"); + return duration != null ? duration.trim() : null; + } + + public static String formatDuration(long millis, String format) { + long durationMillis = millis; + List tokens = lexx(format); + int days = 0; + int hours = 0; + int minutes = 0; + int seconds = 0; + int milliseconds = 0; + + if (Token.containsTokenWithValue(tokens, DAY)) { + days = (int) (durationMillis / MILLIS_PER_DAY); + durationMillis -= days * MILLIS_PER_DAY; + } + if (Token.containsTokenWithValue(tokens, HOUR)) { + hours = (int) (durationMillis / MILLIS_PER_HOUR); + durationMillis -= hours * MILLIS_PER_HOUR; + } + if (Token.containsTokenWithValue(tokens, MINUTE)) { + minutes = (int) (durationMillis / MILLIS_PER_MINUTE); + durationMillis -= minutes * MILLIS_PER_MINUTE; + } + if (Token.containsTokenWithValue(tokens, SECOND)) { + seconds = (int) (durationMillis / MILLIS_PER_SECOND); + durationMillis -= seconds * MILLIS_PER_SECOND; + } + if (Token.containsTokenWithValue(tokens, MILLISECOND)) { + milliseconds = (int) durationMillis; + } + return format(tokens, days, hours, minutes, seconds, milliseconds); + } + + /** + *

The internal method to do the formatting.

+ * + * @param tokens the tokens + * @param days the number of days + * @param hours the number of hours + * @param minutes the number of minutes + * @param seconds the number of seconds + * @param millis the number of millis + * @return the formatted string + */ + private static String format(List tokens, + int days, int hours, int minutes, int seconds, int millis) { + int milliseconds = millis; + StringBuilder buffer = new StringBuilder(); + boolean lastOutputSeconds = false; + for (Token token : tokens) { + Object value = token.getValue(); + if (value instanceof StringBuilder) { + buffer.append(value); + } else { + if (DAY.equals(value)) { + buffer.append(days); + lastOutputSeconds = false; + } else if (HOUR.equals(value)) { + buffer.append(hours); + lastOutputSeconds = false; + } else if (MINUTE.equals(value)) { + buffer.append(minutes); + lastOutputSeconds = false; + } else if (SECOND.equals(value)) { + buffer.append(seconds); + lastOutputSeconds = true; + } else if (MILLISECOND.equals(value)) { + if (lastOutputSeconds) { + milliseconds += 1000; + String str = Integer.toString(milliseconds); + buffer.append(str.substring(1)); + } else { + buffer.append(milliseconds); + } + lastOutputSeconds = false; + } + } + } + return buffer.toString(); + } + + /** + * Parses a classic date format string into Tokens. + * + * @param format to parse + * @return array of Token + */ + private static List lexx(String format) { + char[] array = format.toCharArray(); + List list = new ArrayList<>(array.length); + boolean inLiteral = false; + StringBuilder sb = new StringBuilder(); + Token previous = null; + for (char ch : array) { + if (inLiteral && ch != '\'') { + sb.append(ch); + continue; + } + Object value = null; + switch (ch) { + case '\'': + if (inLiteral) { + sb = new StringBuilder(); + inLiteral = false; + } else { + sb = new StringBuilder(); + list.add(new Token(sb)); + inLiteral = true; + } + break; + case 'y': + value = YEAR; + break; + case 'M': + value = MONTH; + break; + case 'd': + value = DAY; + break; + case 'H': + value = HOUR; + break; + case 'm': + value = MINUTE; + break; + case 's': + value = SECOND; + break; + case 'S': + value = MILLISECOND; + break; + default: + if (sb.length() == 0) { + sb = new StringBuilder(); + list.add(new Token(sb)); + } + sb.append(ch); + } + if (value != null) { + if (previous != null && value.equals(previous.getValue())) { + previous.increment(); + } else { + Token token = new Token(value); + list.add(token); + previous = token; + } + sb.setLength(0); + } + } + return list; + } + + private static String replaceOnce(String text, String searchString, String replacement) { + return replace(text, searchString, replacement, 1); + } + + private static String replace(String text, String searchString, String replacement, int maxvalue) { + int max = maxvalue; + if (isNullOrEmpty(text) || isNullOrEmpty(searchString) || replacement == null || max == 0) { + return text; + } + int start = 0; + int end = text.indexOf(searchString, start); + if (end == -1) { + return text; + } + int replLength = searchString.length(); + int increase = replacement.length() - replLength; + increase = Math.max(increase, 0); + increase *= (max < 0 ? 16 : (Math.min(max, 64))); + StringBuilder buf = new StringBuilder(text.length() + increase); + while (end != -1) { + buf.append(text, start, end).append(replacement); + start = end + replLength; + if (--max == 0) { + break; + } + end = text.indexOf(searchString, start); + } + buf.append(text.substring(start)); + return buf.toString(); + } + + private static boolean isNullOrEmpty(String target) { + return target == null || EMPTY.equals(target); + } + + /** + * Element that is parsed from the format pattern. + */ + private static class Token { + + private final Object value; + + private int count; + + Token(Object value) { + this.value = value; + this.count = 1; + } + + static boolean containsTokenWithValue(List tokens, Object value) { + for (Token token : tokens) { + if (token.getValue().equals(value)) { + return true; + } + } + return false; + } + void increment() { + count++; + } + + Object getValue() { + return value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Token) { + Token tok = (Token) obj; + if (this.value.getClass() != tok.value.getClass()) { + return false; + } + if (this.count != tok.count) { + return false; + } + if (this.value instanceof StringBuilder) { + return this.value.toString().equals(tok.value.toString()); + } else if (this.value instanceof Number) { + return this.value.equals(tok.value); + } else { + return this.value == tok.value; + } + } + return false; + } + + @Override + public int hashCode() { + return this.value.hashCode(); + } + + @Override + public String toString() { + return value + " (" + count + ")"; + } + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 34c3091..af98693 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -12,17 +12,23 @@ public enum Parameters { ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true), - FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true), + FAIL_ON_BULK_ERROR("bulk.fail_on_error", Boolean.class, true), MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), - RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 16), + MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"), - MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, 1 /*Runtime.getRuntime().availableProcessors() - 1*/), + MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "5m"), - MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1kb"), + FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), - FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"); + MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), + + METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), + + RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), + + PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1); private final String name; diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index ed6937f..3e2fdc1 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -10,6 +10,7 @@ public class NodeAdminClient extends AbstractAdminClient { private final NodeClientHelper helper; public NodeAdminClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index d493976..366d2d4 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -10,6 +10,7 @@ public class NodeBulkClient extends AbstractBulkClient { private final NodeClientHelper helper; public NodeBulkClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index 7591c17..fbb341c 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -10,6 +10,7 @@ public class NodeSearchClient extends AbstractSearchClient { private final NodeClientHelper helper; public NodeSearchClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index e376e86..158d2aa 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -129,7 +129,6 @@ class BulkClientTest { .put(helper.getNodeSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - indexDefinition.setStartBulkRefreshSeconds(0); bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index e66c483..9b4a158 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -15,6 +15,7 @@ public class TransportAdminClient extends AbstractAdminClient { private final TransportClientHelper helper; public TransportAdminClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index 3f49942..8441393 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -14,6 +14,7 @@ public class TransportBulkClient extends AbstractBulkClient { private final TransportClientHelper helper; public TransportBulkClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 87d5767..c3ce202 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -14,6 +14,7 @@ public class TransportSearchClient extends AbstractSearchClient { private final TransportClientHelper helper; public TransportSearchClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/gradle.properties b/gradle.properties index 02d6610..883fea9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,10 @@ group = org.xbib name = elx -version = 2.2.1.37 +version = 2.2.1.39 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0 +xbib-time.version = 2.1.0 xbib-guice.version = 4.4.2 xbib-guava.version = 28.1 xbib-netty-http.version = 4.1.63.0