From 870cc09767417f7b2ff2a0d2b68992e79c0e003f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 20 Apr 2021 14:33:53 +0200 Subject: [PATCH] align with es221 --- .../java/org/xbib/elx/api/BulkClient.java | 2 +- .../java/org/xbib/elx/api/BulkProcessor.java | 15 +- elx-common/build.gradle | 1 + .../xbib/elx/common/AbstractAdminClient.java | 22 +- .../xbib/elx/common/AbstractBasicClient.java | 53 +- .../xbib/elx/common/AbstractBulkClient.java | 76 +-- .../xbib/elx/common/AbstractSearchClient.java | 23 +- .../elx/common/DefaultBulkController.java | 222 --------- .../xbib/elx/common/DefaultBulkListener.java | 84 ++-- .../xbib/elx/common/DefaultBulkMetric.java | 161 +++++- .../xbib/elx/common/DefaultBulkProcessor.java | 467 +++++++----------- .../elx/common/DefaultIndexDefinition.java | 18 +- .../xbib/elx/common/DefaultSearchMetric.java | 29 +- .../java/org/xbib/elx/common/FormatUtil.java | 436 ++++++++++++++++ .../org/xbib/elx/common/LongRingBuffer.java | 38 ++ .../org/xbib/elx/common/MockAdminClient.java | 1 - .../java/org/xbib/elx/common/Parameters.java | 29 +- elx-http/build.gradle | 4 +- .../org/xbib/elx/http/HttpAdminClient.java | 1 + .../org/xbib/elx/http/HttpBulkClient.java | 1 + .../org/xbib/elx/http/HttpSearchClient.java | 1 + .../xbib/elx/http/test/BulkClientTest.java | 36 +- .../xbib/elx/http/test/DuplicateIDTest.java | 12 +- .../xbib/elx/http/test/IndexPruneTest.java | 6 +- .../xbib/elx/http/test/IndexShiftTest.java | 8 +- .../org/xbib/elx/http/test/SearchTest.java | 12 +- .../org/xbib/elx/http/test/SmokeTest.java | 10 +- .../org/xbib/elx/node/NodeAdminClient.java | 1 + .../org/xbib/elx/node/NodeBulkClient.java | 1 + .../org/xbib/elx/node/NodeSearchClient.java | 1 + .../xbib/elx/node/test/BulkClientTest.java | 40 +- .../xbib/elx/node/test/DuplicateIDTest.java | 10 +- .../xbib/elx/node/test/IndexPruneTest.java | 6 +- .../xbib/elx/node/test/IndexShiftTest.java | 6 +- .../org/xbib/elx/node/test/SearchTest.java | 10 +- .../org/xbib/elx/node/test/SmokeTest.java | 10 +- .../elx/transport/TransportAdminClient.java | 1 + .../elx/transport/TransportBulkClient.java | 1 + .../elx/transport/TransportClientHelper.java | 40 +- .../elx/transport/TransportSearchClient.java | 1 + .../elx/transport/test/BulkClientTest.java | 62 +-- .../elx/transport/test/DuplicateIDTest.java | 10 +- .../elx/transport/test/IndexPruneTest.java | 6 +- .../elx/transport/test/IndexShiftTest.java | 10 +- .../xbib/elx/transport/test/SearchTest.java | 12 +- .../xbib/elx/transport/test/SmokeTest.java | 10 +- gradle.properties | 3 +- 47 files changed, 1148 insertions(+), 861 deletions(-) delete mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 0ad3aa0..2ce2fc6 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -149,5 +149,5 @@ public interface BulkClient extends BasicClient, Flushable { */ void flushIndex(IndexDefinition indexDefinition); - BulkProcessor getBulkController(); + BulkProcessor getBulkProcessor(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index a59fd28..82d9d7e 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -15,6 +15,14 @@ public interface BulkProcessor extends Closeable, Flushable { void stopBulkMode(IndexDefinition indexDefinition) throws IOException; + void add(DocWriteRequest request); + + boolean waitForBulkResponses(long timeout, TimeUnit unit); + + BulkMetric getBulkMetric(); + + Throwable getLastBulkError(); + void setMaxBulkActions(int bulkActions); int getMaxBulkActions(); @@ -23,11 +31,4 @@ public interface BulkProcessor extends Closeable, Flushable { long getMaxBulkVolume(); - void add(DocWriteRequest request); - - boolean waitForBulkResponses(long timeout, TimeUnit unit); - - BulkMetric getBulkMetric(); - - Throwable getLastBulkError(); } diff --git a/elx-common/build.gradle b/elx-common/build.gradle index 26cbdd9..7cdfdfb 100644 --- a/elx-common/build.gradle +++ b/elx-common/build.gradle @@ -1,5 +1,6 @@ dependencies { api project(':elx-api') + implementation "org.xbib:time:${rootProject.property('xbib-time.version')}" testImplementation "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}" testImplementation "io.netty:netty-codec-http:${project.property('netty.version')}" testImplementation "io.netty:netty-transport:${project.property('netty.version')}" diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 158cec7..3083fdf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public Map getMapping(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return null; } GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) @@ -96,7 +96,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } String index = indexDefinition.getFullIndexName(); @@ -113,7 +113,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } if (level < 1) { @@ -129,7 +129,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public int getReplicaLevel(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return -1; } String index = indexDefinition.getFullIndexName(); @@ -207,7 +207,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (additionalAliases == null) { return new EmptyIndexShiftResult(); } - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return new EmptyIndexShiftResult(); } if (indexDefinition.isShiftEnabled()) { @@ -295,7 +295,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return indexDefinition != null && indexDefinition.isPruneEnabled() && + return indexDefinition != null&& indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), @@ -313,9 +313,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements logger.info("before pruning: index = {} full index = {} delta = {} mintokeep = {} pattern = {}", index, protectedIndexName, delta, mintokeep, pattern); if (delta == 0 && mintokeep == 0) { + logger.info("no candidates found, delta is 0 and mintokeep is 0"); return new EmptyPruneResult(); } if (index.equals(protectedIndexName)) { + logger.info("no candidates found, only protected index name is given"); return new EmptyPruneResult(); } ensureClientIsPresent(); @@ -330,6 +332,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } if (candidateIndices.isEmpty()) { + logger.info("no candidates found"); return new EmptyPruneResult(); } if (mintokeep > 0 && candidateIndices.size() <= mintokeep) { @@ -365,7 +368,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return -1L; } ensureClientIsPresent(); @@ -393,7 +396,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public boolean forceMerge(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return false; } if (!indexDefinition.isForceMergeEnabled()) { @@ -433,6 +436,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public void checkMapping(IndexDefinition indexDefinition) { + if (isIndexDefinitionDisabled(indexDefinition)) { + return; + } ensureClientIsPresent(); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(indexDefinition.getFullIndexName()); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index dc6a323..8e1a373 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -7,6 +7,12 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -22,10 +28,15 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolInfo; import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,12 +48,23 @@ public abstract class AbstractBasicClient implements BasicClient { protected Settings settings; + private final ScheduledThreadPoolExecutor scheduler; + private final AtomicBoolean closed; public AbstractBasicClient() { + this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, + EsExecutors.daemonThreadFactory("elx-bulk-processor")); + this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); closed = new AtomicBoolean(false); } + @Override + public ScheduledThreadPoolExecutor getScheduler() { + return scheduler; + } + @Override public void setClient(ElasticsearchClient client) { this.client = client; @@ -56,7 +78,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void init(Settings settings) throws IOException { if (closed.compareAndSet(false, true)) { - logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(',')); + logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); this.settings = settings; setClient(createClient(settings)); } @@ -82,6 +104,22 @@ public abstract class AbstractBasicClient implements BasicClient { } } + @Override + public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + ensureClientIsPresent(); + if (key == null) { + throw new IOException("no key given"); + } + if (value == null) { + throw new IOException("no value given"); + } + Settings.Builder updateSettingsBuilder = Settings.builder(); + updateSettingsBuilder.put(key, value.toString()); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); + client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + } + @Override public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); @@ -140,6 +178,10 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public long getSearchableDocs(IndexDefinition indexDefinition) { + if (isIndexDefinitionDisabled(indexDefinition)) { + return -1L; + } + ensureClientIsPresent(); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()) @@ -162,6 +204,9 @@ public abstract class AbstractBasicClient implements BasicClient { ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { closeClient(settings); + if (scheduler != null) { + scheduler.shutdown(); + } } } @@ -193,12 +238,12 @@ public abstract class AbstractBasicClient implements BasicClient { } } - protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) { + protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) { if (!indexDefinition.isEnabled()) { logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); - return false; + return true; } - return true; + return false; } protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 2c590a0..3a5f656 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -1,6 +1,5 @@ package org.xbib.elx.common; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; @@ -23,7 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.xbib.elx.api.BulkClient; -import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -37,29 +36,32 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName()); - private BulkController bulkController; + private BulkProcessor bulkProcessor; - private final AtomicBoolean closed = new AtomicBoolean(true); + private final AtomicBoolean closed; + + public AbstractBulkClient() { + super(); + closed = new AtomicBoolean(true); + } @Override public void init(Settings settings) throws IOException { if (closed.compareAndSet(true, false)) { super.init(settings); - logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); - bulkController = new DefaultBulkController(this); - bulkController.init(settings); + bulkProcessor = new DefaultBulkProcessor(this, settings); } } @Override - public BulkController getBulkController() { - return bulkController; + public BulkProcessor getBulkProcessor() { + return bulkProcessor; } @Override public void flush() throws IOException { - if (bulkController != null) { - bulkController.flush(); + if (bulkProcessor != null) { + bulkProcessor.flush(); } } @@ -67,17 +69,18 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void close() throws IOException { if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); - if (bulkController != null) { - logger.info("closing bulk controller"); - bulkController.close(); + if (bulkProcessor != null) { + logger.info("closing bulk processor"); + bulkProcessor.close(); } closeClient(settings); + super.close(); } } @Override public void newIndex(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } String index = indexDefinition.getFullIndexName(); @@ -121,23 +124,23 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void startBulk(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } - if (bulkController != null) { + if (bulkProcessor != null) { ensureClientIsPresent(); - bulkController.startBulkMode(indexDefinition); + bulkProcessor.startBulkMode(indexDefinition); } } @Override public void stopBulk(IndexDefinition indexDefinition) throws IOException { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } - if (bulkController != null) { + if (bulkProcessor != null) { ensureClientIsPresent(); - bulkController.stopBulkMode(indexDefinition); + bulkProcessor.stopBulkMode(indexDefinition); } } @@ -148,7 +151,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return index(new IndexRequest() @@ -158,16 +161,14 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexRequest indexRequest) { - if (bulkController != null) { - ensureClientIsPresent(); - bulkController.bulkIndex(indexRequest); - } + ensureClientIsPresent(); + bulkProcessor.add(indexRequest); return this; } @Override public BulkClient delete(IndexDefinition indexDefinition, String id) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return delete(new DeleteRequest().index(indexDefinition.getFullIndexName()).id(id)); @@ -175,9 +176,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(DeleteRequest deleteRequest) { - if (bulkController != null) { + if (bulkProcessor != null) { ensureClientIsPresent(); - bulkController.bulkDelete(deleteRequest); + bulkProcessor.add(deleteRequest); } return this; } @@ -189,7 +190,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return this; } return update(new UpdateRequest() @@ -199,17 +200,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient update(UpdateRequest updateRequest) { - if (bulkController != null) { + if (bulkProcessor != null) { ensureClientIsPresent(); - bulkController.bulkUpdate(updateRequest); + bulkProcessor.add(updateRequest); } return this; } @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { - ensureClientIsPresent(); - return bulkController.waitForBulkResponses(timeout, timeUnit); + if (bulkProcessor != null) { + ensureClientIsPresent(); + return bulkProcessor.waitForBulkResponses(timeout, timeUnit); + } + return false; } @Override @@ -219,7 +223,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void flushIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); @@ -228,7 +232,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void refreshIndex(IndexDefinition indexDefinition) { - if (!ensureIndexDefinition(indexDefinition)) { + if (isIndexDefinitionDisabled(indexDefinition)) { return; } ensureClientIsPresent(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index e767e95..c61aa19 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -36,6 +36,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement private SearchMetric searchMetric; + private final AtomicBoolean closed; + + public AbstractSearchClient() { + super(); + this.closed = new AtomicBoolean(true); + } + @Override public SearchMetric getSearchMetric() { return searchMetric; @@ -43,16 +50,20 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement @Override public void init(Settings settings) throws IOException { - super.init(settings); - this.searchMetric = new DefaultSearchMetric(); - searchMetric.init(settings); + if (closed.compareAndSet(true, false)) { + super.init(settings); + this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); + searchMetric.init(settings); + } } @Override public void close() throws IOException { - super.close(); - if (searchMetric != null) { - searchMetric.close(); + if (closed.compareAndSet(false, true)) { + super.close(); + if (searchMetric != null) { + searchMetric.close(); + } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java deleted file mode 100644 index 7dd072d..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ /dev/null @@ -1,222 +0,0 @@ -package org.xbib.elx.common; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.xbib.elx.api.BulkClient; -import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkListener; -import org.xbib.elx.api.BulkMetric; -import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.IndexDefinition; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.xbib.elx.api.IndexDefinition.TYPE_NAME; - -public class DefaultBulkController implements BulkController { - - private static final Logger logger = LogManager.getLogger(DefaultBulkController.class); - - private final BulkClient bulkClient; - - private final BulkMetric bulkMetric; - - private BulkProcessor bulkProcessor; - - private BulkListener bulkListener; - - private long maxWaitTime; - - private TimeUnit maxWaitTimeUnit; - - private final AtomicBoolean active; - - public DefaultBulkController(BulkClient bulkClient) { - this.bulkClient = bulkClient; - this.bulkMetric = new DefaultBulkMetric(); - this.active = new AtomicBoolean(false); - } - - @Override - public void init(Settings settings) { - bulkMetric.init(settings); - String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), - Parameters.MAX_WAIT_BULK_RESPONSE.getString()); - TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr, - TimeValue.timeValueSeconds(30), ""); - this.maxWaitTime = maxWaitTimeValue.seconds(); - this.maxWaitTimeUnit = TimeUnit.SECONDS; - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), - Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), - Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); - String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), - Parameters.FLUSH_INTERVAL.getString()); - TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, - TimeValue.timeValueSeconds(30), ""); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), - Parameters.ENABLE_BULK_LOGGING.getBoolean()); - boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), - Parameters.FAIL_ON_BULK_ERROR.getBoolean()); - int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), - Parameters.RESPONSE_TIME_COUNT.getInteger()); - this.bulkListener = new DefaultBulkListener(this, bulkMetric, - enableBulkLogging, failOnBulkError, responseTimeCount); - this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) - .setBulkActions(maxActionsPerRequest) - .setConcurrentRequests(maxConcurrentRequests) - .setFlushInterval(flushIngestInterval) - .setBulkSize(maxVolumePerRequest) - .build(); - this.active.set(true); - if (logger.isInfoEnabled()) { - logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushIngestInterval = {} maxVolumePerRequest = {} " + - "bulk logging = {} fail on bulk error = {} " + - "logger debug = {} from settings = {}", - maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests, - flushIngestInterval, maxVolumePerRequest, - enableBulkLogging, failOnBulkError, - logger.isDebugEnabled(), settings.toDelimitedString(',')); - } - } - - @Override - public BulkProcessor getBulkProcessor() { - return bulkProcessor; - } - - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - - @Override - public Throwable getLastBulkError() { - return bulkListener != null ? bulkListener.getLastBulkError() : null; - } - - @Override - public void inactivate() { - this.active.set(false); - } - - @Override - public void startBulkMode(IndexDefinition indexDefinition) throws IOException { - String indexName = indexDefinition.getFullIndexName(); - if (indexDefinition.getStartBulkRefreshSeconds() != 0) { - bulkClient.updateIndexSetting(indexName, "refresh_interval", - indexDefinition.getStartBulkRefreshSeconds() + "s", - 30L, TimeUnit.SECONDS); - } - } - - @Override - public void bulkIndex(IndexRequest indexRequest) { - ensureActiveAndBulk(); - try { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), TYPE_NAME, indexRequest.id()); - bulkProcessor.add(indexRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of index failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public void bulkDelete(DeleteRequest deleteRequest) { - ensureActiveAndBulk(); - try { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), TYPE_NAME, deleteRequest.id()); - bulkProcessor.add(deleteRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of delete failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public void bulkUpdate(UpdateRequest updateRequest) { - ensureActiveAndBulk(); - try { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), TYPE_NAME, updateRequest.id()); - bulkProcessor.add(updateRequest); - } catch (Exception e) { - if (logger.isErrorEnabled()) { - logger.error("bulk add of update failed: " + e.getMessage(), e); - } - inactivate(); - } - } - - @Override - public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { - try { - if (bulkProcessor != null) { - bulkProcessor.flush(); - return bulkProcessor.awaitFlush(timeout, timeUnit); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("interrupted"); - return false; - } catch (IOException e) { - logger.error(e.getMessage(), e); - return false; - } - return false; - } - - @Override - public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { - flush(); - if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { - if (indexDefinition.getStopBulkRefreshSeconds() != 0) { - bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(), - "refresh_interval", - indexDefinition.getStopBulkRefreshSeconds() + "s", - 30L, TimeUnit.SECONDS); - } - } - } - - @Override - public void flush() throws IOException { - if (bulkProcessor != null) { - bulkProcessor.flush(); - } - } - - @Override - public void close() throws IOException { - bulkMetric.close(); - flush(); - bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); - if (bulkProcessor != null) { - bulkProcessor.close(); - } - } - - private void ensureActiveAndBulk() { - if (!active.get()) { - throw new IllegalStateException("inactive"); - } - if (bulkProcessor == null) { - throw new UnsupportedOperationException("bulk processor not present"); - } - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index a504ae0..43348ff 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -5,18 +5,19 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.xbib.elx.api.BulkController; +import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; -import java.util.Arrays; -import java.util.LongSummaryStatistics; -import java.util.stream.LongStream; +import org.xbib.elx.api.BulkProcessor; + +import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultBulkListener implements BulkListener { private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName()); - private final BulkController bulkController; + private final BulkProcessor bulkProcessor; private final BulkMetric bulkMetric; @@ -26,21 +27,22 @@ public class DefaultBulkListener implements BulkListener { private Throwable lastBulkError; - private final int responseTimeCount; + public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, + ScheduledThreadPoolExecutor scheduler, + Settings settings) { + this.bulkProcessor = bulkProcessor; + boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(), + Parameters.BULK_LOGGING_ENABLED.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), + Parameters.BULK_FAIL_ON_ERROR.getBoolean()); + this.isBulkLoggingEnabled = enableBulkLogging; + this.failOnError = failOnBulkError; + this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); + bulkMetric.start(); + } - private final LastResponseTimes responseTimes; - - public DefaultBulkListener(BulkController bulkController, - BulkMetric bulkMetric, - boolean isBulkLoggingEnabled, - boolean failOnError, - int responseTimeCount) { - this.bulkController = bulkController; - this.bulkMetric = bulkMetric; - this.isBulkLoggingEnabled = isBulkLoggingEnabled; - this.failOnError = failOnError; - this.responseTimeCount = responseTimeCount; - this.responseTimes = new LastResponseTimes(responseTimeCount); + public BulkMetric getBulkMetric() { + return bulkMetric; } @Override @@ -52,7 +54,7 @@ public class DefaultBulkListener implements BulkListener { bulkMetric.getCurrentIngestNumDocs().inc(n); bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", + logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", executionId, request.numberOfActions(), request.estimatedSizeInBytes(), @@ -62,20 +64,11 @@ public class DefaultBulkListener implements BulkListener { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + bulkMetric.recalculate(request, response); long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); bulkMetric.markTotalIngest(response.getItems().length); - if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) { - LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics(); - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("bulk response millis: avg = " + stat.getAverage() + - " min =" + stat.getMin() + - " max = " + stat.getMax() + - " actions = " + bulkController.getBulkProcessor().getBulkActions() + - " size = " + bulkController.getBulkProcessor().getBulkSize()); - } - } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); @@ -86,7 +79,7 @@ public class DefaultBulkListener implements BulkListener { } } if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]", + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", executionId, bulkMetric.getSucceeded().getCount(), bulkMetric.getFailed().getCount(), @@ -114,7 +107,7 @@ public class DefaultBulkListener implements BulkListener { if (logger.isErrorEnabled()) { logger.error("after bulk [" + executionId + "] error", failure); } - bulkController.inactivate(); + bulkProcessor.setEnabled(false); } @Override @@ -122,29 +115,8 @@ public class DefaultBulkListener implements BulkListener { return lastBulkError; } - private static class LastResponseTimes { - - private final Long[] values; - - private final int limit; - - private int index; - - public LastResponseTimes(int limit) { - this.values = new Long[limit]; - Arrays.fill(values, -1L); - this.limit = limit; - this.index = 0; - } - - public int add(Long value) { - int i = index++ % limit; - values[i] = value; - return i; - } - - public LongStream longStream() { - return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue); - } + @Override + public void close() throws IOException { + bulkMetric.close(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 300e227..1123476 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,16 +1,33 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; -import java.util.concurrent.Executors; +import java.io.IOException; +import java.util.LongSummaryStatistics; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class DefaultBulkMetric implements BulkMetric { + private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName()); + + private final DefaultBulkProcessor bulkProcessor; + + private final ScheduledFuture future; + private final Meter totalIngest; private final Count totalIngestSizeInBytes; @@ -25,23 +42,58 @@ public class DefaultBulkMetric implements BulkMetric { private final Count failed; + private final long measureIntervalSeconds; + + private final int ringBufferSize; + + private final LongRingBuffer ringBuffer; + + private final long minVolumePerRequest; + + private long maxVolumePerRequest; + private Long started; private Long stopped; - public DefaultBulkMetric() { - totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor()); - totalIngestSizeInBytes = new CountMetric(); - currentIngest = new CountMetric(); - currentIngestNumDocs = new CountMetric(); - submitted = new CountMetric(); - succeeded = new CountMetric(); - failed = new CountMetric(); - } + private Double lastThroughput; - @Override - public void init(Settings settings) { - start(); + private long currentVolumePerRequest; + + private int x = 0; + + public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor, + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + Settings settings) { + this.bulkProcessor = bulkProcessor; + int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(), + Parameters.BULK_RING_BUFFER_SIZE.getInteger()); + String measureIntervalStr = settings.get(Parameters.BULK_MEASURE_INTERVAL.getName(), + Parameters.BULK_MEASURE_INTERVAL.getString()); + TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, + TimeValue.timeValueSeconds(1), ""); + this.measureIntervalSeconds = measureInterval.seconds(); + this.totalIngest = new Meter(scheduledThreadPoolExecutor); + this.ringBufferSize = ringBufferSize; + this.ringBuffer = new LongRingBuffer(ringBufferSize); + this.totalIngestSizeInBytes = new CountMetric(); + this.currentIngest = new CountMetric(); + this.currentIngestNumDocs = new CountMetric(); + this.submitted = new CountMetric(); + this.succeeded = new CountMetric(); + this.failed = new CountMetric(); + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); + this.minVolumePerRequest = minVolumePerRequest.getBytes(); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m")); + this.maxVolumePerRequest = maxVolumePerRequest.getBytes(); + this.currentVolumePerRequest = minVolumePerRequest.getBytes(); + String metricLogIntervalStr = settings.get(Parameters.BULK_METRIC_LOG_INTERVAL.getName(), + Parameters.BULK_METRIC_LOG_INTERVAL.getString()); + TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, + TimeValue.timeValueSeconds(10), ""); + this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override @@ -92,7 +144,7 @@ public class DefaultBulkMetric implements BulkMetric { @Override public void start() { this.started = System.nanoTime(); - totalIngest.start(5L); + totalIngest.start(measureIntervalSeconds); } @Override @@ -102,8 +154,87 @@ public class DefaultBulkMetric implements BulkMetric { } @Override - public void close() { + public void close() throws IOException { stop(); totalIngest.shutdown(); + log(); + this.future.cancel(true); + } + + @Override + public void recalculate(BulkRequest request, BulkResponse response) { + if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) { + x++; + LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics(); + LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics(); + double throughput = stat2.getAverage() / stat1.getAverage(); + double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d; + double deltaPercent = delta * 100 / throughput; + if (logger.isDebugEnabled()) { + logger.debug("time: avg = " + stat1.getAverage() + + " min = " + stat1.getMin() + + " max = " + stat1.getMax() + + " size: avg = " + stat2.getAverage() + + " min = " + stat2.getMin() + + " max = " + stat2.getMax() + + " last throughput: " + lastThroughput + " bytes/ms" + + " throughput: " + throughput + " bytes/ms" + + " delta = " + delta + + " deltapercent = " + deltaPercent + + " vol = " + currentVolumePerRequest); + } + if ((lastThroughput == null || throughput < 1000000) && stat1.getAverage() < 5000) { + double k = 0.5; + double d = (1 / (1 + Math.exp(-(((double)x)) * k))); + currentVolumePerRequest += d * currentVolumePerRequest; + if (currentVolumePerRequest > maxVolumePerRequest) { + currentVolumePerRequest = maxVolumePerRequest; + } else { + if (logger.isDebugEnabled()) { + logger.debug("increasing volume to " + currentVolumePerRequest + " max volume = " + maxVolumePerRequest); + } + } + bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); + } else if (stat1.getAverage() >= 5000) { + if (currentVolumePerRequest == maxVolumePerRequest) { + // subtract 10% from max + this.maxVolumePerRequest -= (maxVolumePerRequest / 10); + if (maxVolumePerRequest < 1024) { + maxVolumePerRequest = 1024; + } + } + // fall back to minimal volume + currentVolumePerRequest = minVolumePerRequest; + bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); + if (logger.isDebugEnabled()) { + logger.debug("decreasing volume to " + currentVolumePerRequest + " new max volume = " + maxVolumePerRequest); + } + } + lastThroughput = throughput; + } + } + + private void log() { + long docs = getSucceeded().getCount(); + long elapsed = elapsed() / 1000000; // nano to millis + double dps = docs * 1000.0 / elapsed; + long bytes = getTotalIngestSizeInBytes().getCount(); + double avg = bytes / (docs + 1.0); // avoid div by zero + double bps = bytes * 1000.0 / elapsed; + if (logger.isInfoEnabled()) { + logger.log(Level.INFO, "{} docs, {} ms = {}, {} = {}, {} = {} avg, {} = {}, {} = {}", + docs, + elapsed, + FormatUtil.formatDurationWords(elapsed, true, true), + bytes, + FormatUtil.formatSize(bytes), + avg, + FormatUtil.formatSize(avg), + dps, + FormatUtil.formatDocumentSpeed(dps), + bps, + FormatUtil.formatSpeed(bps) + ); + } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 547fd6d..9c62240 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -1,5 +1,7 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkAction; @@ -7,21 +9,18 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.xbib.elx.api.BulkListener; +import org.xbib.elx.api.BulkClient; +import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.BulkRequestHandler; +import org.xbib.elx.api.IndexDefinition; -import java.util.Objects; -import java.util.concurrent.Executors; +import java.io.IOException; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -30,367 +29,215 @@ import java.util.concurrent.atomic.AtomicLong; * (either based on number of actions, based on the size, or time), and * to easily control the number of concurrent bulk * requests allowed to be executed in parallel. - * In order to create a new bulk processor, use the {@link Builder}. */ public class DefaultBulkProcessor implements BulkProcessor { - private final ScheduledThreadPoolExecutor scheduler; + private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); - private final ScheduledFuture scheduledFuture; + private final BulkClient bulkClient; - private final AtomicLong executionIdGen; + private final AtomicBoolean enabled; - private final BulkRequestHandler bulkRequestHandler; + private final ElasticsearchClient client; + + private final DefaultBulkListener bulkListener; + + private ScheduledFuture scheduledFuture; private BulkRequest bulkRequest; + private long bulkVolume; + private int bulkActions; - private long bulkSize; + private final AtomicBoolean closed; - private volatile boolean closed; + private final AtomicLong executionIdGen; - private DefaultBulkProcessor(ElasticsearchClient client, - BulkListener bulkListener, - String name, - int concurrentRequests, - int bulkActions, - ByteSizeValue bulkSize, - TimeValue flushInterval) { - this.executionIdGen = new AtomicLong(); - this.closed = false; - this.bulkActions = bulkActions; - this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler = concurrentRequests == 0 ? - new SyncBulkRequestHandler(client, bulkListener) : - new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests); - if (flushInterval != null) { - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, - EsExecutors.daemonThreadFactory(Settings.EMPTY, - name != null ? "[" + name + "]" : "" + "bulk_processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), + private final ResizeableSemaphore semaphore; + + private final int permits; + + public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { + this.bulkClient = bulkClient; + int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), + Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); + String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), + Parameters.BULK_FLUSH_INTERVAL.getString()); + TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr, + TimeValue.timeValueSeconds(30), ""); + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); + this.client = bulkClient.getClient(); + if (flushInterval.millis() > 0L) { + this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); + } + this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings); + this.bulkActions = maxActionsPerRequest; + this.bulkVolume = minVolumePerRequest.getBytes(); + this.bulkRequest = new BulkRequest(); + this.closed = new AtomicBoolean(false); + this.enabled = new AtomicBoolean(false); + this.executionIdGen = new AtomicLong(); + this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger()); + if (permits < 1) { + throw new IllegalArgumentException("must not be less 1 permits for bulk indexing"); + } + this.semaphore = new ResizeableSemaphore(permits); + if (logger.isInfoEnabled()) { + logger.info("bulk processor now active"); + } + setEnabled(true); + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled.set(enabled); + } + + @Override + public void startBulkMode(IndexDefinition indexDefinition) throws IOException { + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStartBulkRefreshSeconds(); + if (interval != 0) { + logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); + bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); } else { - this.scheduler = null; - this.scheduledFuture = null; + logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } } - public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) { - Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null"); - return new Builder(client, bulkListener); + @Override + public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStopBulkRefreshSeconds(); + flush(); + if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { + if (interval != 0) { + logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); + bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); + } + } } @Override - public void setBulkActions(int bulkActions) { + public void setMaxBulkActions(int bulkActions) { this.bulkActions = bulkActions; } @Override - public int getBulkActions() { + public int getMaxBulkActions() { return bulkActions; } @Override - public void setBulkSize(long bulkSize) { - this.bulkSize = bulkSize; + public void setMaxBulkVolume(long bulkVolume) { + this.bulkVolume = bulkVolume; } @Override - public long getBulkSize() { - return bulkSize; + public long getMaxBulkVolume() { + return bulkVolume; } - /** - * Wait for bulk request handler with flush. - * @param timeout the timeout value - * @param unit the timeout unit - * @return true is method was successful, false if timeout - * @throws InterruptedException if timeout - */ @Override - public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { - Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null"); - if (closed) { - return true; - } - // flush - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - // wait for all bulk responses - return bulkRequestHandler.close(timeout, unit); + public BulkMetric getBulkMetric() { + return bulkListener.getBulkMetric(); } - /** - * Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called - * once as the last action of a bulk processor. - * - * If concurrent requests are not enabled, returns {@code true} immediately. - * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then - * returns {@code true}, - * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned. - * - * @param timeout The maximum time to wait for the bulk requests to complete - * @param unit The time unit of the {@code timeout} argument - * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the - * bulk requests completed - * @throws InterruptedException If the current thread is interrupted - */ @Override - public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { - Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null"); - if (closed) { - return true; - } - closed = true; - if (scheduledFuture != null) { - FutureUtils.cancel(scheduledFuture); - scheduler.shutdown(); - } - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - return bulkRequestHandler.close(timeout, unit); + public Throwable getLastBulkError() { + return bulkListener.getLastBulkError(); } - /** - * Adds either a delete or an index request. - * - * @param request request - * @return his bulk processor - */ @Override - public synchronized DefaultBulkProcessor add(DocWriteRequest request) { - ensureOpen(); + public synchronized void add(DocWriteRequest request) { + ensureOpenAndActive(); bulkRequest.add(request); if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) || - (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) { + (bulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= bulkVolume)) { execute(); } - return this; } - /** - * Flush pending delete or index requests. - */ @Override public synchronized void flush() { - ensureOpen(); + ensureOpenAndActive(); if (bulkRequest.numberOfActions() > 0) { execute(); } + // do not drain semaphore } - /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - */ @Override - public void close() { + public synchronized boolean waitForBulkResponses(long timeout, TimeUnit unit) { try { - // 0 = immediate close - awaitClose(0, TimeUnit.NANOSECONDS); + if (closed.get()) { + // silently skip closed condition + return true; + } + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + return drainSemaphore(timeout, unit); + } catch (InterruptedException exc) { Thread.currentThread().interrupt(); + logger.error("interrupted while waiting for bulk responses"); + return false; } } - private void ensureOpen() { - if (closed) { - throw new IllegalStateException("bulk processor already closed"); + @Override + public synchronized void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + // like flush but without ensuring open + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + drainSemaphore(0L, TimeUnit.NANOSECONDS); + bulkListener.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } private void execute() { BulkRequest myBulkRequest = this.bulkRequest; - long executionId = executionIdGen.incrementAndGet(); this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler.execute(myBulkRequest, executionId); - } - - /** - * A builder used to create a build an instance of a bulk processor. - */ - public static class Builder { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - private String name; - - private int concurrentRequests = 1; - - private int bulkActions = 1000; - - private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB); - - private TimeValue flushInterval = null; - - /** - * Creates a builder of bulk processor with the client to use and the listener that will be used - * to be notified on the completion of bulk requests. - * - * @param client the client - * @param bulkListener the listener - */ - Builder(ElasticsearchClient client, BulkListener bulkListener) { - this.client = client; - this.bulkListener = bulkListener; - } - - /** - * Sets an optional name to identify this bulk processor. - * - * @param name name - * @return this builder - */ - public Builder setName(String name) { - this.name = name; - return this; - } - - /** - * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single - * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed - * while accumulating new bulk requests. Defaults to {@code 1}. - * - * @param concurrentRequests maximum number of concurrent requests - * @return this builder - */ - public Builder setConcurrentRequests(int concurrentRequests) { - this.concurrentRequests = concurrentRequests; - return this; - } - - /** - * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to - * {@code 1000}. Can be set to {@code -1} to disable it. - * - * @param bulkActions bulk actions - * @return this builder - */ - public Builder setBulkActions(int bulkActions) { - this.bulkActions = bulkActions; - return this; - } - - /** - * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to - * {@code 5mb}. Can be set to {@code -1} to disable it. - * - * @param bulkSize bulk size - * @return this builder - */ - public Builder setBulkSize(ByteSizeValue bulkSize) { - this.bulkSize = bulkSize; - return this; - } - - /** - * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set. - * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)} - * can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions. - * - * @param flushInterval flush interval - * @return this builder - */ - public Builder setFlushInterval(TimeValue flushInterval) { - this.flushInterval = flushInterval; - return this; - } - - /** - * Builds a new bulk processor. - * - * @return a bulk processor - */ - public DefaultBulkProcessor build() { - return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); - } - } - - private class Flush implements Runnable { - - @Override - public void run() { - synchronized (DefaultBulkProcessor.this) { - if (closed) { - return; - } - if (bulkRequest.numberOfActions() == 0) { - return; - } - execute(); - } - } - } - - private static class SyncBulkRequestHandler implements BulkRequestHandler { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) { - this.client = client; - this.bulkListener = bulkListener; - } - - @Override - public void execute(BulkRequest bulkRequest, long executionId) { + long executionId = executionIdGen.incrementAndGet(); + if (semaphore == null) { boolean afterCalled = false; try { - bulkListener.beforeBulk(executionId, bulkRequest); - BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + bulkListener.beforeBulk(executionId, myBulkRequest); + BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, myBulkRequest).actionGet(); afterCalled = true; - bulkListener.afterBulk(executionId, bulkRequest, bulkResponse); + bulkListener.afterBulk(executionId, myBulkRequest, bulkResponse); } catch (Exception e) { if (!afterCalled) { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } } - } - - @Override - public boolean close(long timeout, TimeUnit unit) { - return true; - } - } - - private static class AsyncBulkRequestHandler implements BulkRequestHandler { - - private final ElasticsearchClient client; - - private final BulkListener bulkListener; - - private final Semaphore semaphore; - - private final int concurrentRequests; - - private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) { - this.client = client; - this.bulkListener = bulkListener; - this.concurrentRequests = concurrentRequests; - this.semaphore = new Semaphore(concurrentRequests); - } - - @Override - public void execute(BulkRequest bulkRequest, long executionId) { + } else { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; try { - bulkListener.beforeBulk(executionId, bulkRequest); + bulkListener.beforeBulk(executionId, myBulkRequest); semaphore.acquire(); acquired = true; - client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { + client.execute(BulkAction.INSTANCE, myBulkRequest, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { try { - bulkListener.afterBulk(executionId, bulkRequest, response); + bulkListener.afterBulk(executionId, myBulkRequest, response); } finally { semaphore.release(); } @@ -399,7 +246,7 @@ public class DefaultBulkProcessor implements BulkProcessor { @Override public void onFailure(Exception e) { try { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } finally { semaphore.release(); } @@ -408,24 +255,50 @@ public class DefaultBulkProcessor implements BulkProcessor { bulkRequestSetupSuccessful = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } catch (Exception e) { - bulkListener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, myBulkRequest, e); } finally { if (!bulkRequestSetupSuccessful && acquired) { - // if we fail on client.bulk() release the semaphore semaphore.release(); } } } + } + + private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException { + if (semaphore != null) { + if (permits <= 0) { + return true; + } else { + if (semaphore.tryAcquire(permits, timeValue, timeUnit)) { + semaphore.release(permits); + return true; + } + } + } + return false; + } + + private void ensureOpenAndActive() { + if (closed.get()) { + throw new IllegalStateException("bulk processor is closed"); + } + if (!enabled.get()) { + throw new IllegalStateException("bulk processor is no longer enabled"); + } + } + + @SuppressWarnings("serial") + private static class ResizeableSemaphore extends Semaphore { + + ResizeableSemaphore(int permits) { + super(permits, true); + } @Override - public boolean close(long timeout, TimeUnit unit) throws InterruptedException { - if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) { - semaphore.release(concurrentRequests); - return true; - } - return false; + protected void reducePermits(int reduction) { + super.reducePermits(reduction); } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 3d2134f..17c8756 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -68,7 +68,7 @@ public class DefaultIndexDefinition implements IndexDefinition { setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); - setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS); + setMaxWaitTime(30, TimeUnit.SECONDS); setShift(false); setPrune(false); setEnabled(true); @@ -76,7 +76,9 @@ public class DefaultIndexDefinition implements IndexDefinition { public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30)); + String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(), + Parameters.BULK_MAX_WAIT_RESPONSE.getString()); + TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), ""); setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); String indexType = settings.get("type", type); @@ -86,11 +88,13 @@ public class DefaultIndexDefinition implements IndexDefinition { setEnabled(enabled); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); + setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), + Parameters.BULK_START_REFRESH_SECONDS.getInteger())); + setStopBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_STOP_REFRESH_SECONDS.getName(), + Parameters.BULK_STOP_REFRESH_SECONDS.getInteger())); if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); - setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)); - setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); setReplicaLevel(settings.getAsInt("replica", 0)); boolean shift = settings.getAsBoolean("shift", false); setShift(shift); @@ -100,7 +104,7 @@ public class DefaultIndexDefinition implements IndexDefinition { DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) .withZone(ZoneId.systemDefault()); setDateTimeFormatter(dateTimeFormatter); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$"); Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); setDateTimePattern(dateTimePattern); String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); @@ -195,20 +199,24 @@ public class DefaultIndexDefinition implements IndexDefinition { return pattern; } + @Override public IndexDefinition setStartBulkRefreshSeconds(int seconds) { this.startRefreshInterval = seconds; return this; } + @Override public int getStartBulkRefreshSeconds() { return startRefreshInterval; } + @Override public IndexDefinition setStopBulkRefreshSeconds(int seconds) { this.stopRefreshInterval = seconds; return this; } + @Override public int getStopBulkRefreshSeconds() { return stopRefreshInterval; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 4162faf..cde8c5f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -1,15 +1,24 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.SearchMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; -import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class DefaultSearchMetric implements SearchMetric { + private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName()); + + private final ScheduledFuture future; + private final Meter totalQuery; private final Count currentQuery; @@ -28,14 +37,20 @@ public class DefaultSearchMetric implements SearchMetric { private Long stopped; - public DefaultSearchMetric() { - totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor()); + public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + Settings settings) { + totalQuery = new Meter(scheduledThreadPoolExecutor); currentQuery = new CountMetric(); queries = new CountMetric(); succeededQueries = new CountMetric(); emptyQueries = new CountMetric(); failedQueries = new CountMetric(); timeoutQueries = new CountMetric(); + String metricLogIntervalStr = settings.get(Parameters.SEARCH_METRIC_LOG_INTERVAL.getName(), + Parameters.SEARCH_METRIC_LOG_INTERVAL.getString()); + TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, + TimeValue.timeValueSeconds(10), ""); + this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override @@ -98,6 +113,8 @@ public class DefaultSearchMetric implements SearchMetric { public void stop() { this.stopped = System.nanoTime(); totalQuery.stop(); + log(); + this.future.cancel(true); } @Override @@ -105,4 +122,10 @@ public class DefaultSearchMetric implements SearchMetric { stop(); totalQuery.shutdown(); } + + private void log() { + if (logger.isInfoEnabled()) { + logger.info("docs = " + getTotalQueries().getCount()); + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java b/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java new file mode 100644 index 0000000..8eac2de --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java @@ -0,0 +1,436 @@ +package org.xbib.elx.common; + +import org.xbib.time.pretty.PrettyTime; +import java.util.ArrayList; +import java.util.List; + +/** + * Taken from org,apache.commons.lang.DurationFormatUtils of Apache commons-lang. + */ +public class FormatUtil { + + private static final PrettyTime pretty = new PrettyTime(); + + private static final String EMPTY = ""; + private static final String YEAR = "y"; + private static final String MONTH = "M"; + private static final String DAY = "d"; + private static final String HOUR = "H"; + private static final String MINUTE = "m"; + private static final String SECOND = "s"; + private static final String MILLISECOND = "S"; + + /** + * Number of milliseconds in a standard second. + */ + private static final long MILLIS_PER_SECOND = 1000; + /** + * Number of milliseconds in a standard minute. + */ + private static final long MILLIS_PER_MINUTE = 60 * MILLIS_PER_SECOND; + /** + * Number of milliseconds in a standard hour. + */ + private static final long MILLIS_PER_HOUR = 60 * MILLIS_PER_MINUTE; + /** + * Number of milliseconds in a standard day. + */ + private static final long MILLIS_PER_DAY = 24 * MILLIS_PER_HOUR; + + private static final String[] BYTES = { + " B", " kB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB" + }; + private static final String[] BYTES_PER_SECOND = { + " B/s", " kB/s", " MB/s", " GB/s", " TB/s", " PB/s", " EB/s", " ZB/s", " YB/s" + }; + private static final String[] DOCS_PER_SECOND = { + " dps", " kdps", " Mdps", " Gdps", " Tdps", " Pdps", " Edps", " Zdps", " Ydps" + }; + + + /** + * Format byte size (file size as example) into a string, + * with two digits after dot and actual measure (MB, GB or other). + * + * @param size value to format + * @return formatted string in bytes, kB, MB or other. + */ + public static String formatSize(long size) { + return format(size, BYTES, 1024); + } + + public static String formatSize(double size) { + return format(size, BYTES, 1024); + } + + /** + * Format speed values (copy speed as example) into a string + * with two digits after dot and actual measure (MB/s, GB/s or other). + * + * @param speed value to format + * @return formatted string in bytes/s, kB/s, MB/s or other. + */ + public static String formatSpeed(long speed) { + return format(speed, BYTES_PER_SECOND, 1024); + } + + public static String formatSpeed(double speed) { + return format(speed, BYTES_PER_SECOND, 1024); + } + + public static String formatDocumentSpeed(long speed) { + return format(speed, DOCS_PER_SECOND, 1024); + } + + public static String formatDocumentSpeed(double speed) { + return format(speed, DOCS_PER_SECOND, 1024); + } + + /** + * Format any value without string appending. + * + * @param size value to format + * @param measureUnits array of strings to use as measurement units. Use BYTES_PER_SECOND as example. + * @param measureQuantity quantiry, required to step into next unit. Like 1024 for bytes, + * 1000 for meters or 100 for century. + * @return formatted size with measure unit + */ + private static String format(long size, String[] measureUnits, int measureQuantity) { + if (size <= 0) { + return null; + } + if (size < measureQuantity) { + return size + measureUnits[0]; + } + int i = 1; + double d = size; + while ((d = d / measureQuantity) > (measureQuantity - 1)) { + i++; + } + long l = (long) (d * 100); + d = (double) l / 100; + if (i < measureUnits.length) { + return d + measureUnits[i]; + } + return String.valueOf(size); + } + + private static String format(double value, String[] measureUnits, int measureQuantity) { + double d = value; + if (d <= 0.0d) { + return null; + } + if (d < measureQuantity) { + return d + measureUnits[0]; + } + int i = 1; + while ((d = d / measureQuantity) > (measureQuantity - 1)) { + i++; + } + long l = (long) (d * 100); + d = (double) l / 100; + if (i < measureUnits.length) { + return d + measureUnits[i]; + } + return String.valueOf(d); + } + + public static String formatMillis(long millis) { + return pretty.format(pretty.calculateDuration(millis)); + } + + public static String formatDurationWords(long value, boolean suppressLeadingZeroElements, + boolean suppressTrailingZeroElements) { + // This method is generally replacable by the format method, but + // there are a series of tweaks and special cases that require + // trickery to replicate. + String duration = formatDuration(value, "d' days 'H' hours 'm' minutes 's' seconds'"); + if (suppressLeadingZeroElements) { + // this is a temporary marker on the front. Like ^ in regexp. + duration = " " + duration; + String tmp = replaceOnce(duration, " 0 days", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 hours", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 minutes", ""); + if (tmp.length() != duration.length()) { + duration = replaceOnce(tmp, " 0 seconds", ""); + } + } + } + if (duration.length() != 0) { + // strip the space off again + duration = duration.substring(1); + } + } + if (suppressTrailingZeroElements) { + String tmp = replaceOnce(duration, " 0 seconds", ""); + if (tmp != null && tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 minutes", ""); + if (tmp.length() != duration.length()) { + duration = tmp; + tmp = replaceOnce(duration, " 0 hours", ""); + if (tmp.length() != duration.length()) { + duration = replaceOnce(tmp, " 0 days", ""); + } + } + } + } + duration = " " + duration; + duration = replaceOnce(duration, " 1 seconds", " 1 second"); + duration = replaceOnce(duration, " 1 minutes", " 1 minute"); + duration = replaceOnce(duration, " 1 hours", " 1 hour"); + duration = replaceOnce(duration, " 1 days", " 1 day"); + return duration != null ? duration.trim() : null; + } + + public static String formatDuration(long millis, String format) { + long durationMillis = millis; + List tokens = lexx(format); + int days = 0; + int hours = 0; + int minutes = 0; + int seconds = 0; + int milliseconds = 0; + + if (Token.containsTokenWithValue(tokens, DAY)) { + days = (int) (durationMillis / MILLIS_PER_DAY); + durationMillis -= days * MILLIS_PER_DAY; + } + if (Token.containsTokenWithValue(tokens, HOUR)) { + hours = (int) (durationMillis / MILLIS_PER_HOUR); + durationMillis -= hours * MILLIS_PER_HOUR; + } + if (Token.containsTokenWithValue(tokens, MINUTE)) { + minutes = (int) (durationMillis / MILLIS_PER_MINUTE); + durationMillis -= minutes * MILLIS_PER_MINUTE; + } + if (Token.containsTokenWithValue(tokens, SECOND)) { + seconds = (int) (durationMillis / MILLIS_PER_SECOND); + durationMillis -= seconds * MILLIS_PER_SECOND; + } + if (Token.containsTokenWithValue(tokens, MILLISECOND)) { + milliseconds = (int) durationMillis; + } + return format(tokens, days, hours, minutes, seconds, milliseconds); + } + + /** + *

The internal method to do the formatting.

+ * + * @param tokens the tokens + * @param days the number of days + * @param hours the number of hours + * @param minutes the number of minutes + * @param seconds the number of seconds + * @param millis the number of millis + * @return the formatted string + */ + private static String format(List tokens, + int days, int hours, int minutes, int seconds, int millis) { + int milliseconds = millis; + StringBuilder buffer = new StringBuilder(); + boolean lastOutputSeconds = false; + for (Token token : tokens) { + Object value = token.getValue(); + if (value instanceof StringBuilder) { + buffer.append(value); + } else { + if (DAY.equals(value)) { + buffer.append(days); + lastOutputSeconds = false; + } else if (HOUR.equals(value)) { + buffer.append(hours); + lastOutputSeconds = false; + } else if (MINUTE.equals(value)) { + buffer.append(minutes); + lastOutputSeconds = false; + } else if (SECOND.equals(value)) { + buffer.append(seconds); + lastOutputSeconds = true; + } else if (MILLISECOND.equals(value)) { + if (lastOutputSeconds) { + milliseconds += 1000; + String str = Integer.toString(milliseconds); + buffer.append(str.substring(1)); + } else { + buffer.append(milliseconds); + } + lastOutputSeconds = false; + } + } + } + return buffer.toString(); + } + + /** + * Parses a classic date format string into Tokens. + * + * @param format to parse + * @return array of Token + */ + private static List lexx(String format) { + char[] array = format.toCharArray(); + List list = new ArrayList<>(array.length); + boolean inLiteral = false; + StringBuilder sb = new StringBuilder(); + Token previous = null; + for (char ch : array) { + if (inLiteral && ch != '\'') { + sb.append(ch); + continue; + } + Object value = null; + switch (ch) { + case '\'': + if (inLiteral) { + sb = new StringBuilder(); + inLiteral = false; + } else { + sb = new StringBuilder(); + list.add(new Token(sb)); + inLiteral = true; + } + break; + case 'y': + value = YEAR; + break; + case 'M': + value = MONTH; + break; + case 'd': + value = DAY; + break; + case 'H': + value = HOUR; + break; + case 'm': + value = MINUTE; + break; + case 's': + value = SECOND; + break; + case 'S': + value = MILLISECOND; + break; + default: + if (sb.length() == 0) { + sb = new StringBuilder(); + list.add(new Token(sb)); + } + sb.append(ch); + } + if (value != null) { + if (previous != null && value.equals(previous.getValue())) { + previous.increment(); + } else { + Token token = new Token(value); + list.add(token); + previous = token; + } + sb.setLength(0); + } + } + return list; + } + + private static String replaceOnce(String text, String searchString, String replacement) { + return replace(text, searchString, replacement, 1); + } + + private static String replace(String text, String searchString, String replacement, int maxvalue) { + int max = maxvalue; + if (isNullOrEmpty(text) || isNullOrEmpty(searchString) || replacement == null || max == 0) { + return text; + } + int start = 0; + int end = text.indexOf(searchString, start); + if (end == -1) { + return text; + } + int replLength = searchString.length(); + int increase = replacement.length() - replLength; + increase = Math.max(increase, 0); + increase *= (max < 0 ? 16 : (Math.min(max, 64))); + StringBuilder buf = new StringBuilder(text.length() + increase); + while (end != -1) { + buf.append(text, start, end).append(replacement); + start = end + replLength; + if (--max == 0) { + break; + } + end = text.indexOf(searchString, start); + } + buf.append(text.substring(start)); + return buf.toString(); + } + + private static boolean isNullOrEmpty(String target) { + return target == null || EMPTY.equals(target); + } + + /** + * Element that is parsed from the format pattern. + */ + private static class Token { + + private final Object value; + + private int count; + + Token(Object value) { + this.value = value; + this.count = 1; + } + + static boolean containsTokenWithValue(List tokens, Object value) { + for (Token token : tokens) { + if (token.getValue().equals(value)) { + return true; + } + } + return false; + } + void increment() { + count++; + } + + Object getValue() { + return value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Token) { + Token tok = (Token) obj; + if (this.value.getClass() != tok.value.getClass()) { + return false; + } + if (this.count != tok.count) { + return false; + } + if (this.value instanceof StringBuilder) { + return this.value.toString().equals(tok.value.toString()); + } else if (this.value instanceof Number) { + return this.value.equals(tok.value); + } else { + return this.value == tok.value; + } + } + return false; + } + + @Override + public int hashCode() { + return this.value.hashCode(); + } + + @Override + public String toString() { + return value + " (" + count + ")"; + } + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java b/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java new file mode 100644 index 0000000..91aba29 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/LongRingBuffer.java @@ -0,0 +1,38 @@ +package org.xbib.elx.common; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.LongStream; + +public class LongRingBuffer { + + private final Long[] values1, values2; + + private final int limit; + + private final AtomicInteger index; + + public LongRingBuffer(int limit) { + this.values1 = new Long[limit]; + this.values2 = new Long[limit]; + Arrays.fill(values1, -1L); + Arrays.fill(values2, -1L); + this.limit = limit; + this.index = new AtomicInteger(); + } + + public int add(Long v1, Long v2) { + int i = index.incrementAndGet() % limit; + values1[i] = v1; + values2[i] = v2; + return i; + } + + public LongStream longStreamValues1() { + return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue); + } + + public LongStream longStreamValues2() { + return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue); + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index a03a5b6..6e25fef 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -34,7 +34,6 @@ public class MockAdminClient extends AbstractAdminClient { @Override public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 0dabaf9..1dd55bc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -4,28 +4,33 @@ public enum Parameters { DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), - MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"), + BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), - MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), + BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), - START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), + BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), + BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true), - ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true), + BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true), - FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true), + BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), - MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), + BULK_MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"), - RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64), + BULK_MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1m"), - // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests - MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), + BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), - MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"), + BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), - FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"); + BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), + + BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), + + BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1), + + SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s"); private final String name; diff --git a/elx-http/build.gradle b/elx-http/build.gradle index 981ebc2..6a8e8c8 100644 --- a/elx-http/build.gradle +++ b/elx-http/build.gradle @@ -1,9 +1,7 @@ -import org.apache.tools.ant.taskdefs.condition.Os - dependencies{ api project(':elx-common') - api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}" api "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}" + api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}" runtimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}" runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" } diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpAdminClient.java b/elx-http/src/main/java/org/xbib/elx/http/HttpAdminClient.java index ffb99d2..13c3b4e 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpAdminClient.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpAdminClient.java @@ -19,6 +19,7 @@ public class HttpAdminClient extends AbstractAdminClient implements Elasticsearc private final HttpClientHelper helper; public HttpAdminClient() { + super(); this.helper = new HttpClientHelper(); } diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpBulkClient.java b/elx-http/src/main/java/org/xbib/elx/http/HttpBulkClient.java index b88cf83..1f196b4 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpBulkClient.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpBulkClient.java @@ -19,6 +19,7 @@ public class HttpBulkClient extends AbstractBulkClient implements ElasticsearchC private final HttpClientHelper helper; public HttpBulkClient() { + super(); this.helper = new HttpClientHelper(); } diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpSearchClient.java b/elx-http/src/main/java/org/xbib/elx/http/HttpSearchClient.java index 0cff238..f1d9a2d 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpSearchClient.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpSearchClient.java @@ -19,6 +19,7 @@ public class HttpSearchClient extends AbstractSearchClient implements Elasticsea private final HttpClientHelper helper; public HttpSearchClient() { + super(); this.helper = new HttpClientHelper(); } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java index 9b11bad..3bdbf03 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java @@ -27,8 +27,6 @@ class BulkClientTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; - private final TestExtension.Helper helper; BulkClientTest(TestExtension.Helper helper) { @@ -40,7 +38,6 @@ class BulkClientTest { try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.FLUSH_INTERVAL.getName(), TimeValue.timeValueSeconds(5)) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -52,19 +49,17 @@ class BulkClientTest { try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } @@ -74,8 +69,6 @@ class BulkClientTest { try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -84,11 +77,11 @@ class BulkClientTest { bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } @@ -97,18 +90,13 @@ class BulkClientTest { @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; long timeout = 120L; try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - indexDefinition.setStartBulkRefreshSeconds(0); // disable refresh bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); @@ -132,11 +120,11 @@ class BulkClientTest { bulkClient.stopBulk(indexDefinition); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java index ab3b070..4539fef 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; @@ -24,8 +23,6 @@ class DuplicateIDTest { private static final Long ACTIONS = 100L; - private static final Long MAX_ACTIONS_PER_REQUEST = 5L; - private final TestExtension.Helper helper; DuplicateIDTest(TestExtension.Helper helper) { @@ -38,7 +35,6 @@ class DuplicateIDTest { try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -50,11 +46,11 @@ class DuplicateIDTest { bulkClient.refreshIndex(indexDefinition); long hits = bulkClient.getSearchableDocs(indexDefinition); assertTrue(hits < ACTIONS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java index ad1640b..c447ec5 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java @@ -85,10 +85,10 @@ class IndexPruneTest { assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java index 5917326..c0e1b62 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java @@ -4,7 +4,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -23,7 +22,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -109,10 +107,10 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index 7056fa8..9f2b1b2 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; import org.xbib.elx.http.HttpSearchClient; @@ -28,8 +27,6 @@ class SearchTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; SearchTest(TestExtension.Helper helper) { @@ -43,7 +40,6 @@ class SearchTest { try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { @@ -53,11 +49,11 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } try (HttpSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(HttpSearchClientProvider.class) diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index a07da2a..eb2259e 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -64,12 +64,12 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); adminClient.deleteIndex(indexDefinition); XContentBuilder builder = JsonXContent.contentBuilder() .startObject() diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index b8614a7..7b73a19 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -10,6 +10,7 @@ public class NodeAdminClient extends AbstractAdminClient { private final NodeClientHelper helper; public NodeAdminClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index d8163fe..566d99e 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -10,6 +10,7 @@ public class NodeBulkClient extends AbstractBulkClient { private final NodeClientHelper helper; public NodeBulkClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index ed0954c..cfcf2fe 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -10,6 +10,7 @@ public class NodeSearchClient extends AbstractSearchClient { private final NodeClientHelper helper; public NodeSearchClient() { + super(); this.helper = new NodeClientHelper(); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 0512935..5cb472d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -2,12 +2,15 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; +import org.xbib.elx.node.NodeAdminClient; +import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -26,8 +29,6 @@ class BulkClientTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; BulkClientTest(TestExtension.Helper helper) { @@ -39,7 +40,6 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.FLUSH_INTERVAL.getName(), "5s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -51,18 +51,16 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } @@ -72,8 +70,6 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -84,11 +80,11 @@ class BulkClientTest { } bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } @@ -97,15 +93,11 @@ class BulkClientTest { @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setStartBulkRefreshSeconds(0); @@ -132,11 +124,11 @@ class BulkClientTest { bulkClient.stopBulk(indexDefinition); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index 7ce77fc..b9f3e02 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -38,7 +38,7 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -49,11 +49,11 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 4c83b37..5560139 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -87,10 +87,10 @@ class IndexPruneTest { assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 69aaa1c..741aeac 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -107,10 +107,10 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 81f06d7..78e2ba1 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -44,7 +44,7 @@ class SearchTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -56,11 +56,11 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) .setSearchClientProvider(NodeSearchClientProvider.class) diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 883b8f2..bc431ca 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -67,12 +67,12 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); adminClient.deleteIndex(indexDefinition); XContentBuilder builder = JsonXContent.contentBuilder() .startObject() diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 36bdf65..08b3cbc 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -15,6 +15,7 @@ public class TransportAdminClient extends AbstractAdminClient { private final TransportClientHelper helper; public TransportAdminClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index 4c48f8d..b591fc2 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -14,6 +14,7 @@ public class TransportBulkClient extends AbstractBulkClient { private final TransportClientHelper helper; public TransportBulkClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 555d31a..bc74c1d 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -1,6 +1,5 @@ package org.xbib.elx.transport; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -19,9 +18,6 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.transport.Netty4Plugin; @@ -43,26 +39,14 @@ public class TransportClientHelper { private static final Map clientMap = new HashMap<>(); - protected ElasticsearchClient createClient(Settings settings) throws IOException { - if (settings != null) { - String systemIdentifier = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.vm.version") - + " Elasticsearch " + Version.CURRENT.toString(); - Settings transportClientSettings = getTransportClientSettings(settings); - XContentBuilder effectiveSettingsBuilder = XContentFactory.jsonBuilder().startObject(); - logger.log(Level.INFO, "creating transport client on {} with settings {}", - systemIdentifier, - Strings.toString(transportClientSettings.toXContent(effectiveSettingsBuilder, - ToXContent.EMPTY_PARAMS).endObject())); - return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class)); - } - return null; + public ElasticsearchClient createClient(Settings settings) { + String clusterName = settings.get("cluster.name", "elasticsearch"); + return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); } public void closeClient(Settings settings) { - ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); + String clusterName = settings.get("cluster.name", "elasticsearch"); + ElasticsearchClient client = clientMap.remove(clusterName); if (client != null) { if (client instanceof Client) { ((Client) client).close(); @@ -74,7 +58,7 @@ public class TransportClientHelper { public void init(TransportClient transportClient, Settings settings) throws IOException { Collection addrs = findAddresses(settings); if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { - throw new NoNodeAvailableException("no cluster nodes available, check settings " + throw new NoNodeAvailableException("no cluster nodes available, check settings = " + Strings.toString(settings)); } } @@ -128,6 +112,18 @@ public class TransportClientHelper { return false; } + private ElasticsearchClient innerCreateClient(Settings settings) { + String systemIdentifier = System.getProperty("os.name") + + " " + System.getProperty("java.vm.name") + + " " + System.getProperty("java.vm.vendor") + + " " + System.getProperty("java.vm.version") + + " Elasticsearch " + Version.CURRENT.toString(); + logger.info("creating transport client on {} with custom settings {}", + systemIdentifier, Strings.toString(settings)); + Settings transportClientSettings = getTransportClientSettings(settings); + return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class)); + } + private Settings getTransportClientSettings(Settings settings) { return Settings.builder() // "cluster.name" diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 9acf6ef..53bc8a7 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -14,6 +14,7 @@ public class TransportSearchClient extends AbstractSearchClient { private final TransportClientHelper helper; public TransportSearchClient() { + super(); this.helper = new TransportClientHelper(); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 98a684f..b4b287a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -26,8 +25,6 @@ class BulkClientTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; BulkClientTest(TestExtension.Helper helper) { @@ -35,38 +32,32 @@ class BulkClientTest { } @Test - void testSingleDoc() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() + void testNewIndex() throws Exception { + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "30s") - .build(); - try { + .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.close(); } } @Test - void testNewIndex() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() + void testSingleDoc() throws Exception { + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.FLUSH_INTERVAL.getName(), "5s") - .build(); - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - bulkClient.newIndex(indexDefinition); - bulkClient.close(); + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); + } + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); + } } @Test @@ -75,8 +66,6 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -87,11 +76,11 @@ class BulkClientTest { } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } @@ -105,9 +94,6 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -133,11 +119,11 @@ class BulkClientTest { bulkClient.stopBulk(indexDefinition); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 4f9e6f5..20c9618 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -37,7 +37,7 @@ class DuplicateIDTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(helper.getTransportSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); @@ -49,11 +49,11 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index 0347fb9..cbe5132 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -85,10 +85,10 @@ class IndexPruneTest { assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 74f667a..521624b 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -37,11 +37,11 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - try (final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() + TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .build()) { @@ -104,10 +104,10 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 3aafe38..d9762e2 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportSearchClient; @@ -29,8 +28,6 @@ class SearchTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; SearchTest(TestExtension.Helper helper) { @@ -44,7 +41,6 @@ class SearchTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -55,11 +51,11 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 10c94ba..519b262 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -67,12 +67,12 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); adminClient.deleteIndex(indexDefinition); XContentBuilder builder = JsonXContent.contentBuilder() .startObject() diff --git a/gradle.properties b/gradle.properties index e2be6c6..755a55a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,10 @@ group = org.xbib name = elx -version = 7.10.2.0 +version = 7.10.2.1 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0 +xbib-time.version = 2.1.0 xbib-netty-http.version = 4.1.63.0 elasticsearch.version = 7.10.2 # ES 7.10.2 uses Jackson 2.10.4