From 7595fbaeee5202ca5a4af05b492a621f99218697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Thu, 29 Apr 2021 10:54:47 +0200 Subject: [PATCH] making metrics optional --- .../java/org/xbib/elx/api/BasicClient.java | 4 +- .../java/org/xbib/elx/api/BulkProcessor.java | 5 + .../java/org/xbib/elx/api/SearchClient.java | 2 + .../xbib/elx/common/AbstractAdminClient.java | 8 +- .../xbib/elx/common/AbstractBasicClient.java | 35 ++--- .../xbib/elx/common/AbstractBulkClient.java | 1 + .../xbib/elx/common/AbstractSearchClient.java | 146 ++++++++++++------ .../org/xbib/elx/common/ClientBuilder.java | 1 + .../xbib/elx/common/DaemonThreadFactory.java | 26 ++++ .../xbib/elx/common/DefaultBulkListener.java | 100 ++++++------ .../xbib/elx/common/DefaultBulkMetric.java | 8 +- .../xbib/elx/common/DefaultBulkProcessor.java | 16 +- .../xbib/elx/common/DefaultSearchMetric.java | 10 +- .../java/org/xbib/elx/common/Parameters.java | 14 +- .../org/xbib/elx/http/HttpClientHelper.java | 8 +- .../org/xbib/elx/http/test/DumpIDTest.java | 3 +- .../org/xbib/elx/http/test/TestExtension.java | 11 +- .../org/xbib/elx/node/NodeClientHelper.java | 21 ++- .../xbib/elx/node/test/DuplicateIDTest.java | 6 +- .../org/xbib/elx/node/test/SearchTest.java | 4 - .../org/xbib/elx/node/test/TestExtension.java | 7 +- .../elx/transport/TransportClientHelper.java | 29 ++-- .../elx/transport/test/BulkClientTest.java | 12 +- .../xbib/elx/transport/test/DumpIDTest.java | 3 +- .../elx/transport/test/DuplicateIDTest.java | 10 +- .../xbib/elx/transport/test/SearchTest.java | 32 ++-- .../xbib/elx/transport/test/SmokeTest.java | 6 +- .../elx/transport/test/TestExtension.java | 13 +- gradle.properties | 2 +- 29 files changed, 345 insertions(+), 198 deletions(-) create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index d10b49d..cdd05e5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -3,7 +3,7 @@ package org.xbib.elx.api; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { @@ -46,5 +46,5 @@ public interface BasicClient extends Closeable { boolean isIndexExists(IndexDefinition indexDefinition); - ScheduledThreadPoolExecutor getScheduler(); + ScheduledExecutorService getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index a1a7876..7e3235d 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -4,6 +4,7 @@ import org.elasticsearch.action.DocWriteRequest; import java.io.Closeable; import java.io.Flushable; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { @@ -14,6 +15,10 @@ public interface BulkProcessor extends Closeable, Flushable { boolean waitForBulkResponses(long timeout, TimeUnit unit); + ScheduledExecutorService getScheduler(); + + boolean isBulkMetricEnabled(); + BulkMetric getBulkMetric(); Throwable getLastBulkError(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java index 4932bcf..666d56d 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java @@ -14,6 +14,8 @@ import java.util.stream.Stream; public interface SearchClient extends BasicClient { + boolean isSearchMetricEnabled(); + SearchMetric getSearchMetric(); Optional get(Consumer getRequestBuilder); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 1b165a1..d12815e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -114,12 +114,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (isIndexDefinitionDisabled(indexDefinition)) { return this; } - if (indexDefinition.getReplicaCount() < 1) { - logger.warn("invalid replica level"); + if (indexDefinition.getReplicaCount() < 0) { + logger.warn("invalid replica level defined for index " + + indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount()); return this; } - logger.info("update replica level for " + - indexDefinition + " to " + indexDefinition.getReplicaCount()); + logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount()); updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS); waitForHealthyCluster(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 8ec0b81..2d6cccf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -24,13 +24,12 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,21 +41,19 @@ public abstract class AbstractBasicClient implements BasicClient { protected Settings settings; - private final ScheduledThreadPoolExecutor scheduler; + private final ScheduledExecutorService executorService; private final AtomicBoolean closed; public AbstractBasicClient() { - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, - EsExecutors.daemonThreadFactory("elx-bulk-processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.executorService = Executors.newScheduledThreadPool(2, + new DaemonThreadFactory("elx")); closed = new AtomicBoolean(false); } @Override - public ScheduledThreadPoolExecutor getScheduler() { - return scheduler; + public ScheduledExecutorService getScheduler() { + return executorService; } @Override @@ -78,6 +75,16 @@ public abstract class AbstractBasicClient implements BasicClient { } } + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (executorService != null) { + executorService.shutdownNow(); + } + closeClient(settings); + } + } + @Override public String getClusterName() { ensureClientIsPresent(); @@ -184,16 +191,6 @@ public abstract class AbstractBasicClient implements BasicClient { return indicesExistsResponse.isExists(); } - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (scheduler != null) { - scheduler.shutdown(); - } - closeClient(settings); - } - } - protected abstract ElasticsearchClient createClient(Settings settings); protected abstract void closeClient(Settings settings); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 5001739..3e0ce7b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -158,6 +158,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements String indexName = indexDefinition.getFullIndexName(); int interval = indexDefinition.getStopBulkRefreshSeconds(); try { + logger.info("flushing bulk"); bulkProcessor.flush(); } catch (IOException e) { // can never happen diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index dac8bf2..dbff539 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -34,26 +34,24 @@ import java.util.stream.StreamSupport; public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { - private SearchMetric searchMetric; - private final AtomicBoolean closed; + private SearchMetric searchMetric; + public AbstractSearchClient() { super(); this.closed = new AtomicBoolean(true); } - @Override - public SearchMetric getSearchMetric() { - return searchMetric; - } - @Override public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); - this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); - searchMetric.init(settings); + if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(), + Parameters.SEARCH_METRIC_ENABLED.getBoolean())) { + this.searchMetric = new DefaultSearchMetric(this, settings); + searchMetric.init(settings); + } } } @@ -67,20 +65,38 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement } } + @Override + public boolean isSearchMetricEnabled() { + return searchMetric != null; + } + + @Override + public SearchMetric getSearchMetric() { + return searchMetric; + } + @Override public Optional get(Consumer getRequestBuilderConsumer) { GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); getRequestBuilderConsumer.accept(getRequestBuilder); ActionFuture actionFuture = getRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } GetResponse getResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (getResponse.isExists()) { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } else { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); } @@ -90,16 +106,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); ActionFuture actionFuture = multiGetRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } boolean isempty = multiGetItemResponse.getResponses().length == 0; if (isempty) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); } @@ -109,24 +133,34 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); ActionFuture actionFuture = searchRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse searchResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (searchResponse.getFailedShards() > 0) { StringBuilder sb = new StringBuilder("Search failed:"); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { sb.append("\n").append(failure.reason()); } - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } throw new ElasticsearchException(sb.toString()); } boolean isempty = searchResponse.getHits().getHits().length == 0; if (isempty) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } return isempty ? Optional.empty() : Optional.of(searchResponse); } @@ -138,19 +172,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); ActionFuture actionFuture = searchRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse initialSearchResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (initialSearchResponse.getFailedShards() > 0) { - searchMetric.getFailedQueries().inc(); + if (searchMetric != null) { + searchMetric.getFailedQueries().inc(); + } } else if (initialSearchResponse.isTimedOut()) { - searchMetric.getTimeoutQueries().inc(); + if (searchMetric != null) { + searchMetric.getTimeoutQueries().inc(); + } } else if (initialSearchResponse.getHits().getTotalHits().value == 0) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } Stream responseStream = Stream.iterate(initialSearchResponse, searchResponse -> { @@ -159,19 +205,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement .setScrollId(searchResponse.getScrollId()) .setScroll(scrollTime); ActionFuture actionFuture1 = searchScrollRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse searchResponse1 = actionFuture1.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (searchResponse1.getFailedShards() > 0) { - searchMetric.getFailedQueries().inc(); + if (searchMetric != null) { + searchMetric.getFailedQueries().inc(); + } } else if (searchResponse1.isTimedOut()) { - searchMetric.getTimeoutQueries().inc(); + if (searchMetric != null) { + searchMetric.getTimeoutQueries().inc(); + } } else if (searchResponse1.getHits().getHits().length == 0) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } return searchResponse1; }); diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 581f04f..8b4ada5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -14,6 +14,7 @@ import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.SearchClientProvider; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java b/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java new file mode 100644 index 0000000..5e2eb9a --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java @@ -0,0 +1,26 @@ +package org.xbib.elx.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private final String namePrefix; + + public DaemonThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + SecurityManager s = System.getSecurityManager(); + group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0); + t.setDaemon(true); + return t; + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 43348ff..68d3170 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -11,7 +11,6 @@ import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; -import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultBulkListener implements BulkListener { @@ -19,26 +18,22 @@ public class DefaultBulkListener implements BulkListener { private final BulkProcessor bulkProcessor; - private final BulkMetric bulkMetric; - - private final boolean isBulkLoggingEnabled; - private final boolean failOnError; + private BulkMetric bulkMetric; + private Throwable lastBulkError; public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, - ScheduledThreadPoolExecutor scheduler, Settings settings) { this.bulkProcessor = bulkProcessor; - boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(), - Parameters.BULK_LOGGING_ENABLED.getBoolean()); - boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), + this.failOnError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), Parameters.BULK_FAIL_ON_ERROR.getBoolean()); - this.isBulkLoggingEnabled = enableBulkLogging; - this.failOnError = failOnBulkError; - this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); - bulkMetric.start(); + if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(), + Parameters.BULK_METRIC_ENABLED.getBoolean())) { + this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings); + bulkMetric.start(); + } } public BulkMetric getBulkMetric() { @@ -47,47 +42,58 @@ public class DefaultBulkListener implements BulkListener { @Override public void beforeBulk(long executionId, BulkRequest request) { - long l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", - executionId, - request.numberOfActions(), - request.estimatedSizeInBytes(), - l); + if (bulkMetric != null) { + long l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().inc(); + int n = request.numberOfActions(); + bulkMetric.getSubmitted().inc(n); + bulkMetric.getCurrentIngestNumDocs().inc(n); + bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); + if (logger.isDebugEnabled()) { + logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", + executionId, + request.numberOfActions(), + request.estimatedSizeInBytes(), + l); + } } } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - bulkMetric.recalculate(request, response); - long l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - bulkMetric.markTotalIngest(response.getItems().length); + long l = 0L; + if (bulkMetric != null) { + bulkMetric.recalculate(request, response); + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().dec(); + bulkMetric.getSucceeded().inc(response.getItems().length); + bulkMetric.markTotalIngest(response.getItems().length); + } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + } if (itemResponse.isFailed()) { n++; - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); + if (bulkMetric != null) { + bulkMetric.getSucceeded().dec(1); + bulkMetric.getFailed().inc(1); + } } } - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", - executionId, - bulkMetric.getSucceeded().getCount(), - bulkMetric.getFailed().getCount(), - response.getTook().millis(), - l); + if (bulkMetric != null) { + if (logger.isDebugEnabled()) { + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", + executionId, + bulkMetric.getSucceeded().getCount(), + bulkMetric.getFailed().getCount(), + response.getTook().millis(), + l); + } } if (n > 0) { - if (isBulkLoggingEnabled && logger.isErrorEnabled()) { + if (logger.isErrorEnabled()) { logger.error("bulk [{}] failed with {} failed items, failure message = {}", executionId, n, response.buildFailureMessage()); } @@ -96,13 +102,17 @@ public class DefaultBulkListener implements BulkListener { " n = " + n + " message = " + response.buildFailureMessage()); } } else { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + if (bulkMetric != null) { + bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - bulkMetric.getCurrentIngest().dec(); + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(); + } lastBulkError = failure; if (logger.isErrorEnabled()) { logger.error("after bulk [" + executionId + "] error", failure); @@ -117,6 +127,8 @@ public class DefaultBulkListener implements BulkListener { @Override public void close() throws IOException { - bulkMetric.close(); + if (bulkMetric != null) { + bulkMetric.close(); + } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 1123476..0143eaf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -16,8 +16,8 @@ import org.xbib.metrics.common.Meter; import java.io.IOException; import java.util.LongSummaryStatistics; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DefaultBulkMetric implements BulkMetric { @@ -63,7 +63,7 @@ public class DefaultBulkMetric implements BulkMetric { private int x = 0; public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor, - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + ScheduledExecutorService scheduledExecutorService, Settings settings) { this.bulkProcessor = bulkProcessor; int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(), @@ -73,7 +73,7 @@ public class DefaultBulkMetric implements BulkMetric { TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, TimeValue.timeValueSeconds(1), ""); this.measureIntervalSeconds = measureInterval.seconds(); - this.totalIngest = new Meter(scheduledThreadPoolExecutor); + this.totalIngest = new Meter(scheduledExecutorService); this.ringBufferSize = ringBufferSize; this.ringBuffer = new LongRingBuffer(ringBufferSize); this.totalIngestSizeInBytes = new CountMetric(); @@ -93,7 +93,7 @@ public class DefaultBulkMetric implements BulkMetric { Parameters.BULK_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); - this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); + this.future = scheduledExecutorService.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 334d26c..e349779 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -16,6 +16,7 @@ import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -35,6 +36,8 @@ public class DefaultBulkProcessor implements BulkProcessor { private final AtomicBoolean enabled; + private final BulkClient bulkClient; + private final ElasticsearchClient client; private final DefaultBulkListener bulkListener; @@ -56,6 +59,7 @@ public class DefaultBulkProcessor implements BulkProcessor { private final int permits; public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { + this.bulkClient = bulkClient; int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), @@ -69,7 +73,7 @@ public class DefaultBulkProcessor implements BulkProcessor { this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); } - this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings); + this.bulkListener = new DefaultBulkListener(this, settings); this.bulkActions = maxActionsPerRequest; this.bulkVolume = minVolumePerRequest.getBytes(); this.bulkRequest = new BulkRequest(); @@ -112,11 +116,21 @@ public class DefaultBulkProcessor implements BulkProcessor { return bulkVolume; } + @Override + public ScheduledExecutorService getScheduler() { + return bulkClient.getScheduler(); + } + @Override public BulkMetric getBulkMetric() { return bulkListener.getBulkMetric(); } + @Override + public boolean isBulkMetricEnabled() { + return bulkListener.getBulkMetric() != null; + } + @Override public Throwable getLastBulkError() { return bulkListener.getLastBulkError(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 2495f08..7611e1b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -4,13 +4,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.xbib.elx.api.SearchClient; import org.xbib.elx.api.SearchMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DefaultSearchMetric implements SearchMetric { @@ -37,9 +37,9 @@ public class DefaultSearchMetric implements SearchMetric { private Long stopped; - public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + public DefaultSearchMetric(SearchClient searchClient, Settings settings) { - totalQuery = new Meter(scheduledThreadPoolExecutor); + totalQuery = new Meter(searchClient.getScheduler()); currentQuery = new CountMetric(); queries = new CountMetric(); succeededQueries = new CountMetric(); @@ -50,7 +50,7 @@ public class DefaultSearchMetric implements SearchMetric { Parameters.SEARCH_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); - this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); + this.future = searchClient.getScheduler().scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override @@ -128,7 +128,7 @@ public class DefaultSearchMetric implements SearchMetric { logger.info("queries = " + getTotalQueries().getCount() + " succeeded = " + getSucceededQueries().getCount() + " empty = " + getEmptyQueries().getCount() + - " failed = " + getFailedQueries() + + " failed = " + getFailedQueries().getCount() + " timeouts = " + getTimeoutQueries().getCount()); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index d33359f..623c566 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,6 +2,10 @@ package org.xbib.elx.common; public enum Parameters { + HOST("host", String.class, "localhost"), + + PORT("port", Integer.class, 9300), + CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"), CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"), @@ -12,8 +16,6 @@ public enum Parameters { BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true), - BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true), BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), @@ -26,13 +28,17 @@ public enum Parameters { BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), - BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), + BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.FALSE), + + BULK_METRIC_LOG_INTERVAL("bulk.metric.log_interval", String.class, "10s"), BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1), - SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s"); + SEARCH_METRIC_ENABLED("search.metric.enabled", Boolean.class, Boolean.FALSE), + + SEARCH_METRIC_LOG_INTERVAL("search.metric.log_interval", String.class, "10s"); private final String name; diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java b/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java index 59de54d..6310d31 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; +import org.xbib.elx.common.Parameters; import org.xbib.net.URL; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.common.HttpAddress; @@ -68,7 +69,7 @@ public class HttpClientHelper { if (settings.hasValue("url")) { this.url = settings.get("url"); httpAddress = HttpAddress.http1(this.url); - } else if (settings.hasValue("host")) { + } else if (settings.hasValue(Parameters.HOST.getName())) { // use only first host URL u = findAddresses(settings).stream().findFirst() .orElseGet(() -> URL.http().host("localhost").port(9200).build()); @@ -156,11 +157,10 @@ public class HttpClientHelper { } } - private List findAddresses(Settings settings) { - final int defaultPort = settings.getAsInt("port", 9200); + final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9200); List addresses = new ArrayList<>(); - for (String hostname : settings.getAsList("host")) { + for (String hostname : settings.getAsList(Parameters.HOST.getName())) { String[] splitHost = hostname.split(":", 2); if (splitHost.length == 2) { try { diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/DumpIDTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/DumpIDTest.java index 789730d..b207576 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/DumpIDTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/DumpIDTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.common.Parameters; import org.xbib.elx.http.HttpSearchClient; import org.xbib.elx.http.HttpSearchClientProvider; @@ -24,7 +25,7 @@ class DumpIDTest { try (HttpSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(HttpSearchClientProvider.class) .put("cluster.name", "es2") - .put("host", "atlas:9202") + .put(Parameters.HOST.getName(), "atlas:9202") .put("pool.enabled", false) .build(); BufferedWriter writer = Files.newBufferedWriter(Paths.get("zdb.txt"))) { diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java index 1a4094e..aef9bd9 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -155,10 +156,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return Settings.builder() .put("cluster.name", getClusterName()) .put("path.home", getHome()) - .put("host", httpHost) - .put("port", httpPort) - .put("cluster.target_health", "YELLOW") - .put("cluster.target_health_timeout", "1m") + .put(Parameters.HOST.getName(), httpHost) + .put(Parameters.PORT.getName(), httpPort) + .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW") + .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m") + .put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE) + .put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE) .build(); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 58efbf1..6a33728 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -25,9 +25,7 @@ public class NodeClientHelper { private static Object configurationObject; - private static Node node; - - private static final Map clientMap = new HashMap<>(); + private static final Map nodeMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings, Object object) { if (configurationObject == null) { @@ -36,24 +34,25 @@ public class NodeClientHelper { if (configurationObject instanceof ElasticsearchClient) { return (ElasticsearchClient) configurationObject; } - return clientMap.computeIfAbsent(settings.get("cluster.name"), - key -> innerCreateClient(settings)); + String clusterName = settings.get("cluster.name", "elasticsearch"); + Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); + return node != null ? node.client() : null; } public void closeClient(Settings settings) { - clientMap.remove(settings.get("cluster.name")); - logger.debug("closing node..."); + String clusterName = settings.get("cluster.name", "elasticsearch"); + Node node = nodeMap.remove(clusterName); if (node != null) { try { + logger.debug("closing client..."); node.close(); } catch (IOException e) { logger.log(Level.WARN, e.getMessage(), e); } - node = null; } } - private ElasticsearchClient innerCreateClient(Settings settings) { + private Node innerCreateClient(Settings settings) { String version = System.getProperty("os.name") + " " + System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.vendor") @@ -77,10 +76,10 @@ public class NodeClientHelper { version, effectiveSettings.toDelimitedString(',')); Collection> plugins = Collections.singletonList(Netty4Plugin.class); - node = new BulkNode(new Environment(effectiveSettings, null), plugins); + Node node = new BulkNode(new Environment(effectiveSettings, null), plugins); try { node.start(); - return node.client(); + return node; } catch (NodeValidationException e) { logger.log(Level.ERROR, e.getMessage(), e); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index d5241aa..c9c84d5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -22,9 +21,7 @@ class DuplicateIDTest { private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private static final Long ACTIONS = 100L; - - private static final Long MAX_ACTIONS_PER_REQUEST = 5L; + private static final Long ACTIONS = 10000L; private final TestExtension.Helper helper; @@ -38,7 +35,6 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getClientSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 7872329..db0570a 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeSearchClient; @@ -29,8 +28,6 @@ class SearchTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; SearchTest(TestExtension.Helper helper) { @@ -44,7 +41,6 @@ class SearchTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getClientSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index e557069..d4eb397 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -154,8 +155,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .put("node.max_local_storage_nodes", 2) .put("cluster.initial_master_nodes", "1") .put("discovery.seed_hosts", "127.0.0.1:9300") - .put("cluster.target_health", "YELLOW") - .put("cluster.target_health_timeout", "1m") + .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW") + .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m") + .put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE) + .put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE) .build(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index b718254..7c203f7 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -11,7 +11,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; @@ -21,6 +20,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.transport.Netty4Plugin; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.net.InetAddress; @@ -38,16 +38,16 @@ public class TransportClientHelper { private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); - private static final Map clientMap = new HashMap<>(); + private static final Map transportClientMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings) { String clusterName = settings.get("cluster.name", "elasticsearch"); - return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); + return transportClientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); } public void closeClient(Settings settings) { String clusterName = settings.get("cluster.name", "elasticsearch"); - ElasticsearchClient client = clientMap.remove(clusterName); + ElasticsearchClient client = transportClientMap.remove(clusterName); if (client != null) { if (client instanceof Client) { ((Client) client).close(); @@ -65,9 +65,9 @@ public class TransportClientHelper { } private Collection findAddresses(Settings settings) { - final int defaultPort = settings.getAsInt("port", 9300); + final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9300); Collection addresses = new ArrayList<>(); - for (String hostname : settings.getAsList("host")) { + for (String hostname : settings.getAsList(Parameters.HOST.getName())) { String[] splitHost = hostname.split(":", 2); if (splitHost.length == 2) { try { @@ -127,17 +127,15 @@ public class TransportClientHelper { + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.version") + " Elasticsearch " + Version.CURRENT.toString(); - logger.info("creating transport client on {} with custom settings {}", - systemIdentifier, Strings.toString(settings)); Settings transportClientSettings = getTransportClientSettings(settings); + logger.info("creating transport client on {} with settings {}", + systemIdentifier, Strings.toString(transportClientSettings)); return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class)); } private Settings getTransportClientSettings(Settings settings) { return Settings.builder() - // "cluster.name" - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), - settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey())) + .put(settings.filter(key -> !isPrivateSettings(key))) // "node.processors" .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), settings.get(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), @@ -148,6 +146,15 @@ public class TransportClientHelper { .build(); } + private static boolean isPrivateSettings(String key) { + for (Parameters p : Parameters.values()) { + if (key.equals(p.getName())) { + return true; + } + } + return false; + } + static class MyTransportClient extends TransportClient { MyTransportClient(Settings settings, Collection> plugins) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 9f00e26..3a4b86d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -52,7 +52,9 @@ class BulkClientTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -76,7 +78,9 @@ class BulkClientTest { } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -119,7 +123,9 @@ class BulkClientTest { bulkClient.stopBulk(indexDefinition); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DumpIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DumpIDTest.java index a802191..75db858 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DumpIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DumpIDTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportSearchClient; import org.xbib.elx.transport.TransportSearchClientProvider; @@ -24,7 +25,7 @@ class DumpIDTest { try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) .put("cluster.name", "es2") - .put("host", "atlas:9302") + .put(Parameters.HOST.getName(), "atlas:9302") .build(); BufferedWriter writer = Files.newBufferedWriter(Paths.get("zdb.txt"))) { Stream stream = searchClient.getIds(qb -> qb diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 3fe40a4..baa512e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -22,9 +21,7 @@ class DuplicateIDTest { private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private static final Long ACTIONS = 100L; - - private static final Long MAX_ACTIONS_PER_REQUEST = 5L; + private static final Long ACTIONS = 10000L; private final TestExtension.Helper helper; @@ -37,7 +34,6 @@ class DuplicateIDTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); @@ -49,7 +45,9 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index c9dfc75..7bf402c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -51,7 +51,9 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -68,9 +70,11 @@ class SearchTest { TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream docs stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -79,9 +83,11 @@ class SearchTest { final AtomicInteger hitcount = new AtomicInteger(); stream.forEach(hit -> hitcount.incrementAndGet()); assertEquals(numactions, hitcount.get()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream doc ids Stream ids = searchClient.getIds(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -89,11 +95,13 @@ class SearchTest { final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); - assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + } } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 1b72852..7a9e2a0 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -66,8 +66,10 @@ class SmokeTest { assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); - assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index cedd639..b9472be 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -156,10 +157,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return Settings.builder() .put("cluster.name", cluster) .put("path.home", getHome()) - .put("host", host) - .put("port", port) - .put("cluster.target_health", "YELLOW") - .put("cluster.target_health_timeout", "1m") + .put("client.transport.nodes_sampler_interval", "1h") + .put("client.transport.ping_timeout", "1h") + .put(Parameters.HOST.getName(), host) + .put(Parameters.PORT.getName(), port) + .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW") + .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m") + //.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE) + //.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE) .build(); } diff --git a/gradle.properties b/gradle.properties index 5634a72..7127a07 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 7.10.2.4 +version = 7.10.2.5 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0