diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 9999eea..cdd05e5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -3,15 +3,11 @@ package org.xbib.elx.api; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { - /** - * Initiative the client - * @param settings settings - */ void init(Settings settings); void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); @@ -29,7 +25,6 @@ public interface BasicClient extends Closeable { */ ElasticsearchClient getClient(); - /** * Get cluster name. * @return the cluster name @@ -45,14 +40,11 @@ public interface BasicClient extends Closeable { */ String getHealthColor(long maxWaitTime, TimeUnit timeUnit); - /** - * Wait for cluster being healthy. - */ void waitForHealthyCluster(); long getSearchableDocs(IndexDefinition indexDefinition); boolean isIndexExists(IndexDefinition indexDefinition); - ScheduledThreadPoolExecutor getScheduler(); + ScheduledExecutorService getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index b177b41..74df713 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -4,6 +4,7 @@ import org.elasticsearch.action.ActionRequest; import java.io.Closeable; import java.io.Flushable; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { @@ -14,6 +15,10 @@ public interface BulkProcessor extends Closeable, Flushable { boolean waitForBulkResponses(long timeout, TimeUnit unit); + ScheduledExecutorService getScheduler(); + + boolean isBulkMetricEnabled(); + BulkMetric getBulkMetric(); Throwable getLastBulkError(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index c6f669c..df0ab51 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -1,6 +1,8 @@ package org.xbib.elx.api; import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; public interface IndexDefinition { @@ -23,7 +25,9 @@ public interface IndexDefinition { void setMappings(String mappings); - String getMappings(); + Map getMappings(); + + Set getMappingFields(); void setDateTimeFormatter(DateTimeFormatter formatter); @@ -61,7 +65,7 @@ public interface IndexDefinition { int getShardCount(); - void setReplicaCount(int replicaLevel); + void setReplicaCount(int replicaCount); int getReplicaCount(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java index 4932bcf..666d56d 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java @@ -14,6 +14,8 @@ import java.util.stream.Stream; public interface SearchClient extends BasicClient { + boolean isSearchMetricEnabled(); + SearchMetric getSearchMetric(); Optional get(Consumer getRequestBuilder); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 05711e6..97fd478 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -119,16 +119,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return null; - } - if (indexDefinition.getReplicaCount() < 1) { - logger.warn("invalid replica level"); return this; } - logger.info("update replica level for " + - indexDefinition + " to " + indexDefinition.getReplicaCount()); - updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), - 30L, TimeUnit.SECONDS); + if (indexDefinition.getReplicaCount() < 0) { + logger.warn("invalid replica level defined for index " + + indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount()); + return this; + } + logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount()); + updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", + indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS); waitForHealthyCluster(); return this; } @@ -404,8 +404,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements logger.info("force merge of " + indexDefinition); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); forceMergeRequest.indices(indexDefinition.getFullIndexName()); - ForceMergeResponse forceMergeResponse = - client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet(); + ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet(); if (forceMergeResponse.getFailedShards() > 0) { throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } @@ -504,9 +503,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements path = path + fieldName; } if (map.containsKey("index")) { - String mode = (String) map.get("index"); - if ("no".equals(mode)) { - return; + Object mode = map.get("index"); + if (mode instanceof String) { + if ("no".equals(mode)) { + return; + } + } + if (mode instanceof Boolean) { + Boolean b = (Boolean) mode; + if (!b) { + return; + } } } for (Map.Entry entry : map.entrySet()) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 0135338..a2694d0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -36,7 +36,7 @@ import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,21 +48,19 @@ public abstract class AbstractBasicClient implements BasicClient { protected Settings settings; - private final ScheduledThreadPoolExecutor scheduler; + private final ScheduledExecutorService executorService; private final AtomicBoolean closed; public AbstractBasicClient() { - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, - EsExecutors.daemonThreadFactory("elx-bulk-processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.executorService = Executors.newScheduledThreadPool(2, + new DaemonThreadFactory("elx")); closed = new AtomicBoolean(false); } @Override - public ScheduledThreadPoolExecutor getScheduler() { - return scheduler; + public ScheduledExecutorService getScheduler() { + return executorService; } @Override @@ -84,6 +82,17 @@ public abstract class AbstractBasicClient implements BasicClient { } } + @Override + public void close() throws IOException { + ensureClientIsPresent(); + if (closed.compareAndSet(false, true)) { + closeClient(settings); + if (executorService != null) { + executorService.shutdown(); + } + } + } + @Override public String getClusterName() { ensureClientIsPresent(); @@ -189,18 +198,6 @@ public abstract class AbstractBasicClient implements BasicClient { return indicesExistsResponse.isExists(); } - - @Override - public void close() throws IOException { - ensureClientIsPresent(); - if (closed.compareAndSet(false, true)) { - closeClient(settings); - if (scheduler != null) { - scheduler.shutdown(); - } - } - } - protected abstract ElasticsearchClient createClient(Settings settings); protected abstract void closeClient(Settings settings) throws IOException; diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index c630ce4..3798ee3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -65,7 +65,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); if (bulkProcessor != null) { - logger.info("closing bulk procesor"); + logger.info("closing bulk processor"); bulkProcessor.close(); } closeClient(settings); @@ -100,7 +100,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements createIndexRequestBuilder.setSettings(JsonXContent.contentBuilder() .map(settings.getAsStructuredMap()).string()); } catch (IOException e) { - logger.warn(e.getMessage(), e); + logger.log(Level.WARN, e.getMessage(), e); } } else { Settings settings = Settings.builder() @@ -115,15 +115,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } } if (indexDefinition.getMappings() != null) { - try { - Map mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); - createIndexRequestBuilder.addMapping(type, mappings); - } catch (IOException e) { - logger.log(Level.WARN, e.getMessage(), e); - } + createIndexRequestBuilder.addMapping(type, indexDefinition.getMappings()); } else { try { - XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject().startObject(type).endObject().endObject(); createIndexRequestBuilder.addMapping(type, builder); } catch (IOException e) { logger.log(Level.WARN, e.getMessage(), e); @@ -144,26 +140,24 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (isIndexDefinitionDisabled(indexDefinition)) { return; } - if (bulkProcessor != null) { - ensureClientIsPresent(); - Long bulkQueueSize = getThreadPoolQueueSize("bulk"); - if (bulkQueueSize != null && bulkQueueSize <= 64) { - logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); - bulkQueueSize = bulkQueueSize * 4; - } else { - logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); - bulkQueueSize = 256L; - } - putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); - String indexName = indexDefinition.getFullIndexName(); - int interval = indexDefinition.getStartBulkRefreshSeconds(); - if (interval != 0) { - logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - updateIndexSetting(indexName, "refresh_interval", - interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); - } else { - logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); - } + ensureClientIsPresent(); + Long bulkQueueSize = getThreadPoolQueueSize("bulk"); + if (bulkQueueSize != null && bulkQueueSize <= 64) { + logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); + bulkQueueSize = bulkQueueSize * 4; + } else { + logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); + bulkQueueSize = 256L; + } + putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStartBulkRefreshSeconds(); + if (interval != 0) { + logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); + updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } } @@ -177,6 +171,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements String indexName = indexDefinition.getFullIndexName(); int interval = indexDefinition.getStopBulkRefreshSeconds(); try { + logger.info("flushing bulk"); bulkProcessor.flush(); } catch (IOException e) { // can never happen @@ -267,7 +262,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements ensureClientIsPresent(); return bulkProcessor.waitForBulkResponses(timeout, timeUnit); } - return true; + return false; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 3d118c9..6c790f1 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -34,26 +34,24 @@ import java.util.stream.StreamSupport; public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { - private SearchMetric searchMetric; - private final AtomicBoolean closed; + private SearchMetric searchMetric; + public AbstractSearchClient() { super(); this.closed = new AtomicBoolean(true); } - @Override - public SearchMetric getSearchMetric() { - return searchMetric; - } - @Override public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); - this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); - searchMetric.init(settings); + if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(), + Parameters.SEARCH_METRIC_ENABLED.getBoolean())) { + this.searchMetric = new DefaultSearchMetric(this, settings); + searchMetric.init(settings); + } } } @@ -67,20 +65,38 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement } } + @Override + public boolean isSearchMetricEnabled() { + return searchMetric != null; + } + + @Override + public SearchMetric getSearchMetric() { + return searchMetric; + } + @Override public Optional get(Consumer getRequestBuilderConsumer) { GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); getRequestBuilderConsumer.accept(getRequestBuilder); ActionFuture actionFuture = getRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } GetResponse getResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (getResponse.isExists()) { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } else { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); } @@ -90,16 +106,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); ActionFuture actionFuture = multiGetRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } boolean isempty = multiGetItemResponse.getResponses().length == 0; if (isempty) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); } @@ -109,24 +133,34 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); ActionFuture actionFuture = searchRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse searchResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (searchResponse.getFailedShards() > 0) { StringBuilder sb = new StringBuilder("Search failed:"); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { sb.append("\n").append(failure.reason()); } - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } throw new ElasticsearchException(sb.toString()); } boolean isempty = searchResponse.getHits().getHits().length == 0; if (isempty) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } return isempty ? Optional.empty() : Optional.of(searchResponse); } @@ -138,19 +172,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); ActionFuture actionFuture = searchRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse initialSearchResponse = actionFuture.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + } if (initialSearchResponse.getFailedShards() > 0) { - searchMetric.getFailedQueries().inc(); + if (searchMetric != null) { + searchMetric.getFailedQueries().inc(); + } } else if (initialSearchResponse.isTimedOut()) { - searchMetric.getTimeoutQueries().inc(); + if (searchMetric != null) { + searchMetric.getTimeoutQueries().inc(); + } } else if (initialSearchResponse.getHits().getTotalHits() == 0) { - searchMetric.getEmptyQueries().inc(); + if (searchMetric != null) { + searchMetric.getEmptyQueries().inc(); + } } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getSucceededQueries().inc(); + } } Stream responseStream = Stream.iterate(initialSearchResponse, searchResponse -> { @@ -159,19 +205,23 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement .setScrollId(searchResponse.getScrollId()) .setScroll(scrollTime); ActionFuture actionFuture1 = searchScrollRequestBuilder.execute(); - searchMetric.getCurrentQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().inc(); + } SearchResponse searchResponse1 = actionFuture1.actionGet(); - searchMetric.getCurrentQueries().dec(); - searchMetric.getQueries().inc(); - searchMetric.markTotalQueries(1); - if (searchResponse1.getFailedShards() > 0) { - searchMetric.getFailedQueries().inc(); - } else if (searchResponse1.isTimedOut()) { - searchMetric.getTimeoutQueries().inc(); - } else if (searchResponse1.getHits().getHits().length == 0) { - searchMetric.getEmptyQueries().inc(); - } else { - searchMetric.getSucceededQueries().inc(); + if (searchMetric != null) { + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + if (searchResponse1.getFailedShards() > 0) { + searchMetric.getFailedQueries().inc(); + } else if (searchResponse1.isTimedOut()) { + searchMetric.getTimeoutQueries().inc(); + } else if (searchResponse1.getHits().getHits().length == 0) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } } return searchResponse1; }); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java b/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java new file mode 100644 index 0000000..5e2eb9a --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DaemonThreadFactory.java @@ -0,0 +1,26 @@ +package org.xbib.elx.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private final String namePrefix; + + public DaemonThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + SecurityManager s = System.getSecurityManager(); + group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0); + t.setDaemon(true); + return t; + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 43348ff..493a195 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -11,7 +11,7 @@ import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; public class DefaultBulkListener implements BulkListener { @@ -19,26 +19,23 @@ public class DefaultBulkListener implements BulkListener { private final BulkProcessor bulkProcessor; - private final BulkMetric bulkMetric; - - private final boolean isBulkLoggingEnabled; - private final boolean failOnError; + private BulkMetric bulkMetric; + private Throwable lastBulkError; public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, - ScheduledThreadPoolExecutor scheduler, Settings settings) { this.bulkProcessor = bulkProcessor; - boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(), - Parameters.BULK_LOGGING_ENABLED.getBoolean()); boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), Parameters.BULK_FAIL_ON_ERROR.getBoolean()); - this.isBulkLoggingEnabled = enableBulkLogging; this.failOnError = failOnBulkError; - this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); - bulkMetric.start(); + if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(), + Parameters.BULK_METRIC_ENABLED.getBoolean())) { + this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings); + bulkMetric.start(); + } } public BulkMetric getBulkMetric() { @@ -47,47 +44,58 @@ public class DefaultBulkListener implements BulkListener { @Override public void beforeBulk(long executionId, BulkRequest request) { - long l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", - executionId, - request.numberOfActions(), - request.estimatedSizeInBytes(), - l); + if (bulkMetric != null) { + long l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().inc(); + int n = request.numberOfActions(); + bulkMetric.getSubmitted().inc(n); + bulkMetric.getCurrentIngestNumDocs().inc(n); + bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); + if (logger.isDebugEnabled()) { + logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", + executionId, + request.numberOfActions(), + request.estimatedSizeInBytes(), + l); + } } } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - bulkMetric.recalculate(request, response); - long l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - bulkMetric.markTotalIngest(response.getItems().length); + long l = 0L; + if (bulkMetric != null) { + bulkMetric.recalculate(request, response); + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().dec(); + bulkMetric.getSucceeded().inc(response.getItems().length); + bulkMetric.markTotalIngest(response.getItems().length); + } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + } if (itemResponse.isFailed()) { n++; - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); + if (bulkMetric != null) { + bulkMetric.getSucceeded().dec(1); + bulkMetric.getFailed().inc(1); + } } } - if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", - executionId, - bulkMetric.getSucceeded().getCount(), - bulkMetric.getFailed().getCount(), - response.getTook().millis(), - l); + if (bulkMetric != null) { + if (logger.isDebugEnabled()) { + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", + executionId, + bulkMetric.getSucceeded().getCount(), + bulkMetric.getFailed().getCount(), + response.getTook().millis(), + l); + } } if (n > 0) { - if (isBulkLoggingEnabled && logger.isErrorEnabled()) { + if (logger.isErrorEnabled()) { logger.error("bulk [{}] failed with {} failed items, failure message = {}", executionId, n, response.buildFailureMessage()); } @@ -96,13 +104,17 @@ public class DefaultBulkListener implements BulkListener { " n = " + n + " message = " + response.buildFailureMessage()); } } else { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + if (bulkMetric != null) { + bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - bulkMetric.getCurrentIngest().dec(); + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(); + } lastBulkError = failure; if (logger.isErrorEnabled()) { logger.error("after bulk [" + executionId + "] error", failure); @@ -117,6 +129,8 @@ public class DefaultBulkListener implements BulkListener { @Override public void close() throws IOException { - bulkMetric.close(); + if (bulkMetric != null) { + bulkMetric.close(); + } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index da25255..4b16895 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -16,8 +16,8 @@ import org.xbib.metrics.common.Meter; import java.io.IOException; import java.util.LongSummaryStatistics; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DefaultBulkMetric implements BulkMetric { @@ -63,7 +63,7 @@ public class DefaultBulkMetric implements BulkMetric { private int x = 0; public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor, - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + ScheduledExecutorService scheduledExecutorService, Settings settings) { this.bulkProcessor = bulkProcessor; int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(), @@ -73,7 +73,7 @@ public class DefaultBulkMetric implements BulkMetric { TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, TimeValue.timeValueSeconds(1), ""); this.measureIntervalSeconds = measureInterval.seconds(); - this.totalIngest = new Meter(scheduledThreadPoolExecutor); + this.totalIngest = new Meter(scheduledExecutorService); this.ringBufferSize = ringBufferSize; this.ringBuffer = new LongRingBuffer(ringBufferSize); this.totalIngestSizeInBytes = new CountMetric(); @@ -93,7 +93,7 @@ public class DefaultBulkMetric implements BulkMetric { Parameters.BULK_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); - this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); + this.future = scheduledExecutorService.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 4cb2db0..e02a316 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -16,6 +16,7 @@ import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -35,11 +36,13 @@ public class DefaultBulkProcessor implements BulkProcessor { private final AtomicBoolean enabled; + private final BulkClient bulkClient; + private final ElasticsearchClient client; private final DefaultBulkListener bulkListener; - private ScheduledFuture scheduledFuture; + private ScheduledFuture flushIntervalFuture; private BulkRequest bulkRequest; @@ -56,22 +59,28 @@ public class DefaultBulkProcessor implements BulkProcessor { private final int permits; public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { + this.bulkClient = bulkClient; int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), Parameters.BULK_FLUSH_INTERVAL.getString()); TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr, TimeValue.timeValueSeconds(30), ""); - ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); this.client = bulkClient.getClient(); if (flushInterval.millis() > 0L) { - this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), + this.flushIntervalFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); } - this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings); + this.bulkListener = new DefaultBulkListener(this, settings); this.bulkActions = maxActionsPerRequest; + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); this.bulkVolume = minVolumePerRequest.getBytes(); + if (!isBulkMetricEnabled()) { + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m")); + this.bulkVolume = maxVolumePerRequest.getBytes(); + } this.bulkRequest = new BulkRequest(); this.closed = new AtomicBoolean(false); this.enabled = new AtomicBoolean(false); @@ -112,11 +121,21 @@ public class DefaultBulkProcessor implements BulkProcessor { return bulkVolume; } + @Override + public ScheduledExecutorService getScheduler() { + return bulkClient.getScheduler(); + } + @Override public BulkMetric getBulkMetric() { return bulkListener.getBulkMetric(); } + @Override + public boolean isBulkMetricEnabled() { + return bulkListener.getBulkMetric() != null; + } + @Override public Throwable getLastBulkError() { return bulkListener.getLastBulkError(); @@ -163,8 +182,8 @@ public class DefaultBulkProcessor implements BulkProcessor { public synchronized void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - if (scheduledFuture != null) { - scheduledFuture.cancel(true); + if (flushIntervalFuture != null) { + flushIntervalFuture.cancel(true); } // like flush but without ensuring open if (bulkRequest.numberOfActions() > 0) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index cb073d8..6555ec2 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -18,6 +18,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; public class DefaultIndexDefinition implements IndexDefinition { @@ -90,26 +91,26 @@ public class DefaultIndexDefinition implements IndexDefinition { if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); - boolean shift = settings.getAsBoolean("shift", false); - setShift(shift); - if (shift) { - String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(), - Parameters.DATE_TIME_FORMAT.getString()); - DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) - .withZone(ZoneId.systemDefault()); - setDateTimeFormatter(dateTimeFormatter); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$"); - Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); - setDateTimePattern(dateTimePattern); - String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); - fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); - setFullIndexName(fullIndexName); - boolean prune = settings.getAsBoolean("prune", false); - setPrune(prune); - if (prune) { - setMinToKeep(settings.getAsInt("retention.mintokeep", 2)); - setDelta(settings.getAsInt("retention.delta", 2)); - } + } + boolean shift = settings.getAsBoolean("shift", false); + setShift(shift); + if (shift) { + String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(), + Parameters.DATE_TIME_FORMAT.getString()); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) + .withZone(ZoneId.systemDefault()); + setDateTimeFormatter(dateTimeFormatter); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$"); + Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); + setDateTimePattern(dateTimePattern); + String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); + fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); + setFullIndexName(fullIndexName); + boolean prune = settings.getAsBoolean("prune", false); + setPrune(prune); + if (prune) { + setMinToKeep(settings.getAsInt("retention.mintokeep", 2)); + setDelta(settings.getAsInt("retention.delta", 2)); } } } @@ -134,7 +135,6 @@ public class DefaultIndexDefinition implements IndexDefinition { return type; } - @Override public void setFullIndexName(String fullIndexName) { this.fullIndexName = fullIndexName; @@ -161,8 +161,23 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public String getMappings() { - return mappings; + public Map getMappings() { + if (mappings == null) { + return null; + } + try { + return JsonXContent.jsonXContent.createParser(mappings).mapOrdered(); + } catch (IOException e) { + return null; + } + } + + @Override + public Set getMappingFields() { + if (mappings == null) { + return null; + } + return Settings.builder().loadFromSource(mappings).build().getGroups("properties").keySet(); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 48b4d99..7611e1b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -4,13 +4,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.xbib.elx.api.SearchClient; import org.xbib.elx.api.SearchMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DefaultSearchMetric implements SearchMetric { @@ -37,9 +37,9 @@ public class DefaultSearchMetric implements SearchMetric { private Long stopped; - public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + public DefaultSearchMetric(SearchClient searchClient, Settings settings) { - totalQuery = new Meter(scheduledThreadPoolExecutor); + totalQuery = new Meter(searchClient.getScheduler()); currentQuery = new CountMetric(); queries = new CountMetric(); succeededQueries = new CountMetric(); @@ -50,7 +50,7 @@ public class DefaultSearchMetric implements SearchMetric { Parameters.SEARCH_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); - this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); + this.future = searchClient.getScheduler().scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); } @Override @@ -126,10 +126,10 @@ public class DefaultSearchMetric implements SearchMetric { private void log() { if (logger.isInfoEnabled()) { logger.info("queries = " + getTotalQueries().getCount() + - " succeeded = " + getSucceededQueries().getCount() + - " empty = " + getEmptyQueries().getCount() + - " failed = " + getFailedQueries() + - " timeouts = " + getTimeoutQueries().getCount()); + " succeeded = " + getSucceededQueries().getCount() + + " empty = " + getEmptyQueries().getCount() + + " failed = " + getFailedQueries().getCount() + + " timeouts = " + getTimeoutQueries().getCount()); } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 30a3d9e..445a726 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,20 +2,20 @@ package org.xbib.elx.common; public enum Parameters { + HOST("host", String.class, "localhost"), + + PORT("port", Integer.class, 9300), + CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"), CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"), DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), - BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), - BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true), - BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true), BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), @@ -26,15 +26,19 @@ public enum Parameters { BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), - BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), + BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.TRUE), - BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), + BULK_METRIC_LOG_INTERVAL("bulk.metric.log_interval", String.class, "10s"), + + BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1), - SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s"); + SEARCH_METRIC_ENABLED("search.metric.enabled", Boolean.class, Boolean.FALSE), + + SEARCH_METRIC_LOG_INTERVAL("search.metric.log_interval", String.class, "10s"); private final String name; diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index 7b73a19..772924a 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractAdminClient; -import java.io.IOException; public class NodeAdminClient extends AbstractAdminClient { @@ -20,7 +19,7 @@ public class NodeAdminClient extends AbstractAdminClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 5f1147c..d52f11b 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -9,7 +9,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.xbib.elx.common.Parameters; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -21,9 +20,7 @@ public class NodeClientHelper { private static Object configurationObject; - private static Node node; - - private static final Map clientMap = new HashMap<>(); + private static final Map nodeMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings, Object object) { if (configurationObject == null) { @@ -32,20 +29,21 @@ public class NodeClientHelper { if (configurationObject instanceof ElasticsearchClient) { return (ElasticsearchClient) configurationObject; } - return clientMap.computeIfAbsent(settings.get("cluster.name"), - key -> innerCreateClient(settings)); + String clusterName = settings.get("cluster.name", "elasticsearch"); + Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); + return node != null ? node.client() : null; } - public void closeClient(Settings settings) throws IOException { - ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); - if (client != null) { + public void closeClient(Settings settings) { + String clusterName = settings.get("cluster.name", "elasticsearch"); + Node node = nodeMap.remove(settings.get("cluster.name")); + if (node != null) { logger.debug("closing node..."); node.close(); - node = null; } } - private ElasticsearchClient innerCreateClient(Settings settings) { + private Node innerCreateClient(Settings settings) { String version = System.getProperty("os.name") + " " + System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.vendor") @@ -61,9 +59,9 @@ public class NodeClientHelper { logger.info("creating node client on {} with effective settings {}", version, effectiveSettings.toDelimitedString(',')); Collection> plugins = Collections.emptyList(); - node = new BulkNode(new Environment(effectiveSettings), plugins); + Node node = new BulkNode(new Environment(effectiveSettings), plugins); node.start(); - return node.client(); + return node; } private static Settings filterSettings(Settings settings) { diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index cfcf2fe..3269e22 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractSearchClient; -import java.io.IOException; public class NodeSearchClient extends AbstractSearchClient { @@ -20,7 +19,7 @@ public class NodeSearchClient extends AbstractSearchClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 16b3893..adb28a7 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -52,7 +52,9 @@ class BulkClientTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -76,7 +78,9 @@ class BulkClientTest { } bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -119,7 +123,9 @@ class BulkClientTest { bulkClient.stopBulk(indexDefinition); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index d5241aa..b2c61f2 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -22,9 +21,7 @@ class DuplicateIDTest { private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private static final Long ACTIONS = 100L; - - private static final Long MAX_ACTIONS_PER_REQUEST = 5L; + private static final Long ACTIONS = 10000L; private final TestExtension.Helper helper; @@ -38,7 +35,6 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getClientSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -49,7 +45,9 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 0610c79..07ee30f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -62,9 +62,9 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); + indexDefinition.setEnabled(true); indexDefinition.setDelta(2); indexDefinition.setMinToKeep(2); - indexDefinition.setEnabled(true); indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 5538f2e..9546d7c 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -42,7 +42,7 @@ class IndexShiftTest { .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getClientSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 0e2abf5..e4e8507 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -52,7 +52,9 @@ class SearchTest { assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -69,9 +71,11 @@ class SearchTest { TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream docs stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -80,9 +84,11 @@ class SearchTest { final AtomicInteger hitcount = new AtomicInteger(); stream.forEach(hit -> hitcount.incrementAndGet()); assertEquals(numactions, hitcount.get()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream doc ids Stream ids = searchClient.getIds(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -90,11 +96,13 @@ class SearchTest { final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); - assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + } } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index e6e5d7d..9f4f745 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -17,9 +17,9 @@ import org.xbib.elx.node.NodeBulkClientProvider; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SmokeTest { @@ -51,22 +51,24 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); - assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index a5d9322..63108c6 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -31,6 +32,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final Logger logger = LogManager.getLogger("test"); + private static final Random random = new Random(); + + private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); + private static final String key = "es-instance-"; private static final AtomicInteger count = new AtomicInteger(0); @@ -52,7 +57,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } @Override - public void beforeEach(ExtensionContext extensionContext) throws Exception { + public void beforeEach(ExtensionContext extensionContext) { Helper helper = extensionContext.getParent().isPresent() ? extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; Objects.requireNonNull(helper); @@ -135,8 +140,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .put("node.client", true) .put("node.master", false) .put("node.data", false) - .put("cluster.target_health", "YELLOW") - .put("cluster.target_health_timeout", "1m") + .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW") + .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m") + .put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE) + .put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE) .build(); } @@ -183,14 +190,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void closeNodes() { if (node != null) { - logger.info("closing all nodes"); + logger.info("closing node"); node.client().close(); node.close(); } } - - private static final Random random = new Random(); - - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 9ce152d..0cad6bd 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.jboss.netty.channel.DefaultChannelFuture; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; @@ -23,21 +24,22 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; public class TransportClientHelper { private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); - private static final Map clientMap = new HashMap<>(); + private static final Map transportClientMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings) { String clusterName = settings.get("cluster.name", "elasticsearch"); - return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); + return transportClientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); } public void closeClient(Settings settings) { String clusterName = settings.get("cluster.name", "elasticsearch"); - ElasticsearchClient client = clientMap.remove(clusterName); + ElasticsearchClient client = transportClientMap.remove(clusterName); if (client != null) { if (client instanceof Client) { ((Client) client).close(); @@ -55,31 +57,29 @@ public class TransportClientHelper { } private Collection findAddresses(Settings settings) { - final int defaultPort = settings.getAsInt("port", 9300); + final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9300); Collection addresses = new ArrayList<>(); - for (String hostname : settings.getAsArray("host")) { + for (String hostname : settings.getAsArray(Parameters.HOST.getName())) { String[] splitHost = hostname.split(":", 2); - if (splitHost.length == 2) { - try { - String host = splitHost[0]; - InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); - int port = Integer.parseInt(splitHost[1]); - TransportAddress address = new InetSocketTransportAddress(inetAddress, port); + try { + if (splitHost.length == 2) { + InetAddress inetAddress = + NetworkUtils.resolveInetAddress(splitHost[0], null); + TransportAddress address = + new InetSocketTransportAddress(inetAddress, Integer.parseInt(splitHost[1])); addresses.add(address); - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } - } else if (splitHost.length == 1) { - try { - String host = splitHost[0]; - InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); - TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); + } else if (splitHost.length == 1) { + InetAddress inetAddress = + NetworkUtils.resolveInetAddress(splitHost[0], null); + TransportAddress address = + new InetSocketTransportAddress(inetAddress, defaultPort); addresses.add(address); - } catch (IOException e) { - logger.warn(e.getMessage(), e); + } else { + throw new IllegalArgumentException("invalid hostname specification: " + hostname); } - } else { - throw new IllegalArgumentException("invalid hostname specification: " + hostname); + } + catch (IOException e) { + logger.warn(e.getMessage(), e); } } return addresses; @@ -114,24 +114,39 @@ public class TransportClientHelper { + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.version") + " Elasticsearch " + Version.CURRENT.toString(); - logger.info("creating transport client on {} with custom settings {}", - systemIdentifier, settings.getAsMap()); + Settings transportClientSettings = getTransportClientSettings(settings); + logger.info("creating transport client on {} with settings {}", + systemIdentifier, transportClientSettings.getAsMap()); // we need to disable dead lock check because we may have mixed node/transport clients DefaultChannelFuture.setUseDeadLockChecker(false); return TransportClient.builder() - .settings(getTransportClientSettings(settings)) + .settings(transportClientSettings) .build(); } private Settings getTransportClientSettings(Settings settings) { return Settings.builder() - .put("cluster.name", settings.get("cluster.name", "elasticsearch")) + .put(filter(settings, key -> !isPrivateSettings(key))) .put("path.home", settings.get("path.home", ".")) - .put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) // for thread pool size / worker count - .put("client.transport.sniff", settings.getAsBoolean("client.transport.sniff", false)) // always disable sniff - .put("client.transport.nodes_sampler_interval", settings.get("client.transport.nodes_sampler_interval", "10000s")) // ridculous long ping, default is 5 seconds - .put("client.transport.ping_timeout", settings.get("client.transport.ping_timeout", "10000s")) // ridiculous ping for unresponsive nodes, defauult is 5 seconds - .put("client.transport.ignore_cluster_name", settings.getAsBoolean("client.transport.ignore_cluster_name", true)) // connect to any cluster .build(); } + + private static Settings filter(Settings settings, Predicate predicate) { + Settings.Builder builder = Settings.settingsBuilder(); + for (Map.Entry me : settings.getAsMap().entrySet()) { + if (predicate.test(me.getKey())) { + builder.put(me.getKey(), me.getValue()); + } + } + return builder.build(); + } + + private static boolean isPrivateSettings(String key) { + for (Parameters p : Parameters.values()) { + if (key.equals(p.getName())) { + return true; + } + } + return false; + } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index c122e0d..624cc70 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -52,7 +52,9 @@ class BulkClientTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -69,12 +71,15 @@ class BulkClientTest { .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + bulkClient.stopBulk(indexDefinition); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -118,7 +123,9 @@ class BulkClientTest { logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); - assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 4643a9a..672134b 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -19,7 +19,7 @@ class DuplicateIDTest { private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private static final Long ACTIONS = 100L; + private static final Long ACTIONS = 10000L; private final TestExtension.Helper helper; @@ -43,7 +43,9 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index 4423f8c..05c5da9 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -41,9 +41,9 @@ class IndexPruneTest { .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getClientSettings()) - .build()) { + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getClientSettings()) + .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); @@ -62,10 +62,10 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); - indexDefinition.setEnabled(true); indexDefinition.setDelta(2); indexDefinition.setMinToKeep(2); indexDefinition.setPrune(true); + indexDefinition.setEnabled(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index e4b39e9..cb9723e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -43,10 +43,10 @@ class IndexShiftTest { .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getClientSettings()) - .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getClientSettings()) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index ea60953..7d7a926 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -54,7 +54,9 @@ class SearchTest { assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } @@ -71,9 +73,11 @@ class SearchTest { TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream docs stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -82,9 +86,11 @@ class SearchTest { final AtomicInteger hitcount = new AtomicInteger(); stream.forEach(hit -> hitcount.incrementAndGet()); assertEquals(numactions, hitcount.get()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + } // test stream doc ids Stream ids = searchClient.getIds(qb -> qb .setIndices(indexDefinition.getFullIndexName()) @@ -92,11 +98,13 @@ class SearchTest { final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); - assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + if (searchClient.isSearchMetricEnabled()) { + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + } } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index fb7a9f5..8c7f14c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -49,22 +49,24 @@ class SmokeTest { assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); - assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + } if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 5791ffd..21c73d7 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -14,6 +14,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; +import org.xbib.elx.common.Parameters; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -31,6 +32,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final Logger logger = LogManager.getLogger("test"); + private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); + + private static final Random random = new SecureRandom(); + private static final String key = "es-instance-"; private static final AtomicInteger count = new AtomicInteger(0); @@ -52,7 +57,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } @Override - public void beforeEach(ExtensionContext extensionContext) throws Exception { + public void beforeEach(ExtensionContext extensionContext) { Helper helper = extensionContext.getParent().isPresent() ? extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; Objects.requireNonNull(helper); @@ -129,10 +134,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return Settings.builder() .put("cluster.name", cluster) .put("path.home", getHome()) - .put("host", host) - .put("port", port) - .put("cluster.target_health", "YELLOW") - .put("cluster.target_health_timeout", "1m") + .put("client.transport.nodes_sampler_interval", "1h") + .put("client.transport.ping_timeout", "1h") + .put(Parameters.HOST.getName(), host) + .put(Parameters.PORT.getName(), port) + .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW") + .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m") + .put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE) + .put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE) .build(); } @@ -179,9 +188,5 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft node.close(); } } - - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - - private static final Random random = new SecureRandom(); } } diff --git a/gradle.properties b/gradle.properties index 178e078..61427f6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.46 +version = 2.2.1.47 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0