making metrics optional
This commit is contained in:
parent
b7d3184200
commit
7595fbaeee
29 changed files with 345 additions and 198 deletions
|
@ -3,7 +3,7 @@ package org.xbib.elx.api;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface BasicClient extends Closeable {
|
||||
|
@ -46,5 +46,5 @@ public interface BasicClient extends Closeable {
|
|||
|
||||
boolean isIndexExists(IndexDefinition indexDefinition);
|
||||
|
||||
ScheduledThreadPoolExecutor getScheduler();
|
||||
ScheduledExecutorService getScheduler();
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import org.elasticsearch.action.DocWriteRequest;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.Flushable;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface BulkProcessor extends Closeable, Flushable {
|
||||
|
@ -14,6 +15,10 @@ public interface BulkProcessor extends Closeable, Flushable {
|
|||
|
||||
boolean waitForBulkResponses(long timeout, TimeUnit unit);
|
||||
|
||||
ScheduledExecutorService getScheduler();
|
||||
|
||||
boolean isBulkMetricEnabled();
|
||||
|
||||
BulkMetric getBulkMetric();
|
||||
|
||||
Throwable getLastBulkError();
|
||||
|
|
|
@ -14,6 +14,8 @@ import java.util.stream.Stream;
|
|||
|
||||
public interface SearchClient extends BasicClient {
|
||||
|
||||
boolean isSearchMetricEnabled();
|
||||
|
||||
SearchMetric getSearchMetric();
|
||||
|
||||
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder);
|
||||
|
|
|
@ -114,12 +114,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
if (indexDefinition.getReplicaCount() < 1) {
|
||||
logger.warn("invalid replica level");
|
||||
if (indexDefinition.getReplicaCount() < 0) {
|
||||
logger.warn("invalid replica level defined for index "
|
||||
+ indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount());
|
||||
return this;
|
||||
}
|
||||
logger.info("update replica level for " +
|
||||
indexDefinition + " to " + indexDefinition.getReplicaCount());
|
||||
logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
|
||||
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
|
||||
indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
|
||||
waitForHealthyCluster();
|
||||
|
|
|
@ -24,13 +24,12 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.xbib.elx.api.BasicClient;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -42,21 +41,19 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
|
||||
protected Settings settings;
|
||||
|
||||
private final ScheduledThreadPoolExecutor scheduler;
|
||||
private final ScheduledExecutorService executorService;
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public AbstractBasicClient() {
|
||||
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2,
|
||||
EsExecutors.daemonThreadFactory("elx-bulk-processor"));
|
||||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
this.executorService = Executors.newScheduledThreadPool(2,
|
||||
new DaemonThreadFactory("elx"));
|
||||
closed = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledThreadPoolExecutor getScheduler() {
|
||||
return scheduler;
|
||||
public ScheduledExecutorService getScheduler() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,6 +75,16 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
if (executorService != null) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterName() {
|
||||
ensureClientIsPresent();
|
||||
|
@ -184,16 +191,6 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
return indicesExistsResponse.isExists();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
if (scheduler != null) {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract ElasticsearchClient createClient(Settings settings);
|
||||
|
||||
protected abstract void closeClient(Settings settings);
|
||||
|
|
|
@ -158,6 +158,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
String indexName = indexDefinition.getFullIndexName();
|
||||
int interval = indexDefinition.getStopBulkRefreshSeconds();
|
||||
try {
|
||||
logger.info("flushing bulk");
|
||||
bulkProcessor.flush();
|
||||
} catch (IOException e) {
|
||||
// can never happen
|
||||
|
|
|
@ -34,26 +34,24 @@ import java.util.stream.StreamSupport;
|
|||
|
||||
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
|
||||
|
||||
private SearchMetric searchMetric;
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
private SearchMetric searchMetric;
|
||||
|
||||
public AbstractSearchClient() {
|
||||
super();
|
||||
this.closed = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchMetric getSearchMetric() {
|
||||
return searchMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
if (closed.compareAndSet(true, false)) {
|
||||
super.init(settings);
|
||||
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings);
|
||||
searchMetric.init(settings);
|
||||
if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(),
|
||||
Parameters.SEARCH_METRIC_ENABLED.getBoolean())) {
|
||||
this.searchMetric = new DefaultSearchMetric(this, settings);
|
||||
searchMetric.init(settings);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,20 +65,38 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSearchMetricEnabled() {
|
||||
return searchMetric != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchMetric getSearchMetric() {
|
||||
return searchMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
|
||||
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
|
||||
getRequestBuilderConsumer.accept(getRequestBuilder);
|
||||
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
GetResponse getResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
if (getResponse.isExists()) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
} else {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
}
|
||||
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty();
|
||||
}
|
||||
|
@ -90,16 +106,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
|
||||
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
|
||||
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
MultiGetResponse multiGetItemResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
boolean isempty = multiGetItemResponse.getResponses().length == 0;
|
||||
if (isempty) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse);
|
||||
}
|
||||
|
@ -109,24 +133,34 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
|
||||
queryBuilder.accept(searchRequestBuilder);
|
||||
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
SearchResponse searchResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
if (searchResponse.getFailedShards() > 0) {
|
||||
StringBuilder sb = new StringBuilder("Search failed:");
|
||||
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
|
||||
sb.append("\n").append(failure.reason());
|
||||
}
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
throw new ElasticsearchException(sb.toString());
|
||||
}
|
||||
boolean isempty = searchResponse.getHits().getHits().length == 0;
|
||||
if (isempty) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(searchResponse);
|
||||
}
|
||||
|
@ -138,19 +172,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
queryBuilder.accept(searchRequestBuilder);
|
||||
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
|
||||
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
SearchResponse initialSearchResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
if (initialSearchResponse.getFailedShards() > 0) {
|
||||
searchMetric.getFailedQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getFailedQueries().inc();
|
||||
}
|
||||
} else if (initialSearchResponse.isTimedOut()) {
|
||||
searchMetric.getTimeoutQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getTimeoutQueries().inc();
|
||||
}
|
||||
} else if (initialSearchResponse.getHits().getTotalHits().value == 0) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse,
|
||||
searchResponse -> {
|
||||
|
@ -159,19 +205,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
.setScrollId(searchResponse.getScrollId())
|
||||
.setScroll(scrollTime);
|
||||
ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
SearchResponse searchResponse1 = actionFuture1.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
if (searchResponse1.getFailedShards() > 0) {
|
||||
searchMetric.getFailedQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getFailedQueries().inc();
|
||||
}
|
||||
} else if (searchResponse1.isTimedOut()) {
|
||||
searchMetric.getTimeoutQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getTimeoutQueries().inc();
|
||||
}
|
||||
} else if (searchResponse1.getHits().getHits().length == 0) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
return searchResponse1;
|
||||
});
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.xbib.elx.api.BasicClient;
|
|||
import org.xbib.elx.api.SearchClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class DaemonThreadFactory implements ThreadFactory {
|
||||
|
||||
private final ThreadGroup group;
|
||||
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
|
||||
private final String namePrefix;
|
||||
|
||||
public DaemonThreadFactory(String namePrefix) {
|
||||
this.namePrefix = namePrefix;
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0);
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
}
|
|
@ -11,7 +11,6 @@ import org.xbib.elx.api.BulkMetric;
|
|||
import org.xbib.elx.api.BulkProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
public class DefaultBulkListener implements BulkListener {
|
||||
|
||||
|
@ -19,26 +18,22 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
private final BulkProcessor bulkProcessor;
|
||||
|
||||
private final BulkMetric bulkMetric;
|
||||
|
||||
private final boolean isBulkLoggingEnabled;
|
||||
|
||||
private final boolean failOnError;
|
||||
|
||||
private BulkMetric bulkMetric;
|
||||
|
||||
private Throwable lastBulkError;
|
||||
|
||||
public DefaultBulkListener(DefaultBulkProcessor bulkProcessor,
|
||||
ScheduledThreadPoolExecutor scheduler,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(),
|
||||
Parameters.BULK_LOGGING_ENABLED.getBoolean());
|
||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
|
||||
this.failOnError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
|
||||
Parameters.BULK_FAIL_ON_ERROR.getBoolean());
|
||||
this.isBulkLoggingEnabled = enableBulkLogging;
|
||||
this.failOnError = failOnBulkError;
|
||||
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings);
|
||||
bulkMetric.start();
|
||||
if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(),
|
||||
Parameters.BULK_METRIC_ENABLED.getBoolean())) {
|
||||
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings);
|
||||
bulkMetric.start();
|
||||
}
|
||||
}
|
||||
|
||||
public BulkMetric getBulkMetric() {
|
||||
|
@ -47,47 +42,58 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().inc();
|
||||
int n = request.numberOfActions();
|
||||
bulkMetric.getSubmitted().inc(n);
|
||||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]",
|
||||
executionId,
|
||||
request.numberOfActions(),
|
||||
request.estimatedSizeInBytes(),
|
||||
l);
|
||||
if (bulkMetric != null) {
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().inc();
|
||||
int n = request.numberOfActions();
|
||||
bulkMetric.getSubmitted().inc(n);
|
||||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]",
|
||||
executionId,
|
||||
request.numberOfActions(),
|
||||
request.estimatedSizeInBytes(),
|
||||
l);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
bulkMetric.recalculate(request, response);
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
bulkMetric.markTotalIngest(response.getItems().length);
|
||||
long l = 0L;
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.recalculate(request, response);
|
||||
l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
bulkMetric.markTotalIngest(response.getItems().length);
|
||||
}
|
||||
int n = 0;
|
||||
for (BulkItemResponse itemResponse : response.getItems()) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
}
|
||||
if (itemResponse.isFailed()) {
|
||||
n++;
|
||||
bulkMetric.getSucceeded().dec(1);
|
||||
bulkMetric.getFailed().inc(1);
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getSucceeded().dec(1);
|
||||
bulkMetric.getFailed().inc(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]",
|
||||
executionId,
|
||||
bulkMetric.getSucceeded().getCount(),
|
||||
bulkMetric.getFailed().getCount(),
|
||||
response.getTook().millis(),
|
||||
l);
|
||||
if (bulkMetric != null) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]",
|
||||
executionId,
|
||||
bulkMetric.getSucceeded().getCount(),
|
||||
bulkMetric.getFailed().getCount(),
|
||||
response.getTook().millis(),
|
||||
l);
|
||||
}
|
||||
}
|
||||
if (n > 0) {
|
||||
if (isBulkLoggingEnabled && logger.isErrorEnabled()) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
|
||||
executionId, n, response.buildFailureMessage());
|
||||
}
|
||||
|
@ -96,13 +102,17 @@ public class DefaultBulkListener implements BulkListener {
|
|||
" n = " + n + " message = " + response.buildFailureMessage());
|
||||
}
|
||||
} else {
|
||||
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
}
|
||||
lastBulkError = failure;
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("after bulk [" + executionId + "] error", failure);
|
||||
|
@ -117,6 +127,8 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
bulkMetric.close();
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ import org.xbib.metrics.common.Meter;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.LongSummaryStatistics;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DefaultBulkMetric implements BulkMetric {
|
||||
|
@ -63,7 +63,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
private int x = 0;
|
||||
|
||||
public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor,
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(),
|
||||
|
@ -73,7 +73,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr,
|
||||
TimeValue.timeValueSeconds(1), "");
|
||||
this.measureIntervalSeconds = measureInterval.seconds();
|
||||
this.totalIngest = new Meter(scheduledThreadPoolExecutor);
|
||||
this.totalIngest = new Meter(scheduledExecutorService);
|
||||
this.ringBufferSize = ringBufferSize;
|
||||
this.ringBuffer = new LongRingBuffer(ringBufferSize);
|
||||
this.totalIngestSizeInBytes = new CountMetric();
|
||||
|
@ -93,7 +93,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
Parameters.BULK_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
this.future = scheduledExecutorService.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.xbib.elx.api.BulkMetric;
|
|||
import org.xbib.elx.api.BulkProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -35,6 +36,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
|
||||
private final AtomicBoolean enabled;
|
||||
|
||||
private final BulkClient bulkClient;
|
||||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final DefaultBulkListener bulkListener;
|
||||
|
@ -56,6 +59,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
private final int permits;
|
||||
|
||||
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
||||
this.bulkClient = bulkClient;
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
|
||||
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
|
||||
|
@ -69,7 +73,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings);
|
||||
this.bulkListener = new DefaultBulkListener(this, settings);
|
||||
this.bulkActions = maxActionsPerRequest;
|
||||
this.bulkVolume = minVolumePerRequest.getBytes();
|
||||
this.bulkRequest = new BulkRequest();
|
||||
|
@ -112,11 +116,21 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
return bulkVolume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledExecutorService getScheduler() {
|
||||
return bulkClient.getScheduler();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkListener.getBulkMetric();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBulkMetricEnabled() {
|
||||
return bulkListener.getBulkMetric() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable getLastBulkError() {
|
||||
return bulkListener.getLastBulkError();
|
||||
|
|
|
@ -4,13 +4,13 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.xbib.elx.api.SearchClient;
|
||||
import org.xbib.elx.api.SearchMetric;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import org.xbib.metrics.common.CountMetric;
|
||||
import org.xbib.metrics.common.Meter;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DefaultSearchMetric implements SearchMetric {
|
||||
|
@ -37,9 +37,9 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
|
||||
private Long stopped;
|
||||
|
||||
public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||
public DefaultSearchMetric(SearchClient searchClient,
|
||||
Settings settings) {
|
||||
totalQuery = new Meter(scheduledThreadPoolExecutor);
|
||||
totalQuery = new Meter(searchClient.getScheduler());
|
||||
currentQuery = new CountMetric();
|
||||
queries = new CountMetric();
|
||||
succeededQueries = new CountMetric();
|
||||
|
@ -50,7 +50,7 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
Parameters.SEARCH_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
this.future = searchClient.getScheduler().scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,7 +128,7 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
logger.info("queries = " + getTotalQueries().getCount() +
|
||||
" succeeded = " + getSucceededQueries().getCount() +
|
||||
" empty = " + getEmptyQueries().getCount() +
|
||||
" failed = " + getFailedQueries() +
|
||||
" failed = " + getFailedQueries().getCount() +
|
||||
" timeouts = " + getTimeoutQueries().getCount());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,10 @@ package org.xbib.elx.common;
|
|||
|
||||
public enum Parameters {
|
||||
|
||||
HOST("host", String.class, "localhost"),
|
||||
|
||||
PORT("port", Integer.class, 9300),
|
||||
|
||||
CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"),
|
||||
|
||||
CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"),
|
||||
|
@ -12,8 +16,6 @@ public enum Parameters {
|
|||
|
||||
BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
|
||||
|
||||
BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true),
|
||||
|
||||
BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true),
|
||||
|
||||
BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
||||
|
@ -26,13 +28,17 @@ public enum Parameters {
|
|||
|
||||
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
|
||||
BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"),
|
||||
BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.FALSE),
|
||||
|
||||
BULK_METRIC_LOG_INTERVAL("bulk.metric.log_interval", String.class, "10s"),
|
||||
|
||||
BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||
|
||||
BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
|
||||
|
||||
SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s");
|
||||
SEARCH_METRIC_ENABLED("search.metric.enabled", Boolean.class, Boolean.FALSE),
|
||||
|
||||
SEARCH_METRIC_LOG_INTERVAL("search.metric.log_interval", String.class, "10s");
|
||||
|
||||
private final String name;
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.net.URL;
|
||||
import org.xbib.netty.http.client.Client;
|
||||
import org.xbib.netty.http.common.HttpAddress;
|
||||
|
@ -68,7 +69,7 @@ public class HttpClientHelper {
|
|||
if (settings.hasValue("url")) {
|
||||
this.url = settings.get("url");
|
||||
httpAddress = HttpAddress.http1(this.url);
|
||||
} else if (settings.hasValue("host")) {
|
||||
} else if (settings.hasValue(Parameters.HOST.getName())) {
|
||||
// use only first host
|
||||
URL u = findAddresses(settings).stream().findFirst()
|
||||
.orElseGet(() -> URL.http().host("localhost").port(9200).build());
|
||||
|
@ -156,11 +157,10 @@ public class HttpClientHelper {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private List<URL> findAddresses(Settings settings) {
|
||||
final int defaultPort = settings.getAsInt("port", 9200);
|
||||
final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9200);
|
||||
List<URL> addresses = new ArrayList<>();
|
||||
for (String hostname : settings.getAsList("host")) {
|
||||
for (String hostname : settings.getAsList(Parameters.HOST.getName())) {
|
||||
String[] splitHost = hostname.split(":", 2);
|
||||
if (splitHost.length == 2) {
|
||||
try {
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.http.HttpSearchClient;
|
||||
import org.xbib.elx.http.HttpSearchClientProvider;
|
||||
|
||||
|
@ -24,7 +25,7 @@ class DumpIDTest {
|
|||
try (HttpSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(HttpSearchClientProvider.class)
|
||||
.put("cluster.name", "es2")
|
||||
.put("host", "atlas:9202")
|
||||
.put(Parameters.HOST.getName(), "atlas:9202")
|
||||
.put("pool.enabled", false)
|
||||
.build();
|
||||
BufferedWriter writer = Files.newBufferedWriter(Paths.get("zdb.txt"))) {
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
|
|||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
|
@ -155,10 +156,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.put("host", httpHost)
|
||||
.put("port", httpPort)
|
||||
.put("cluster.target_health", "YELLOW")
|
||||
.put("cluster.target_health_timeout", "1m")
|
||||
.put(Parameters.HOST.getName(), httpHost)
|
||||
.put(Parameters.PORT.getName(), httpPort)
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
|
||||
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -25,9 +25,7 @@ public class NodeClientHelper {
|
|||
|
||||
private static Object configurationObject;
|
||||
|
||||
private static Node node;
|
||||
|
||||
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
|
||||
private static final Map<String, Node> nodeMap = new HashMap<>();
|
||||
|
||||
public ElasticsearchClient createClient(Settings settings, Object object) {
|
||||
if (configurationObject == null) {
|
||||
|
@ -36,24 +34,25 @@ public class NodeClientHelper {
|
|||
if (configurationObject instanceof ElasticsearchClient) {
|
||||
return (ElasticsearchClient) configurationObject;
|
||||
}
|
||||
return clientMap.computeIfAbsent(settings.get("cluster.name"),
|
||||
key -> innerCreateClient(settings));
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
|
||||
return node != null ? node.client() : null;
|
||||
}
|
||||
|
||||
public void closeClient(Settings settings) {
|
||||
clientMap.remove(settings.get("cluster.name"));
|
||||
logger.debug("closing node...");
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
Node node = nodeMap.remove(clusterName);
|
||||
if (node != null) {
|
||||
try {
|
||||
logger.debug("closing client...");
|
||||
node.close();
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.WARN, e.getMessage(), e);
|
||||
}
|
||||
node = null;
|
||||
}
|
||||
}
|
||||
|
||||
private ElasticsearchClient innerCreateClient(Settings settings) {
|
||||
private Node innerCreateClient(Settings settings) {
|
||||
String version = System.getProperty("os.name")
|
||||
+ " " + System.getProperty("java.vm.name")
|
||||
+ " " + System.getProperty("java.vm.vendor")
|
||||
|
@ -77,10 +76,10 @@ public class NodeClientHelper {
|
|||
version, effectiveSettings.toDelimitedString(','));
|
||||
Collection<Class<? extends Plugin>> plugins =
|
||||
Collections.singletonList(Netty4Plugin.class);
|
||||
node = new BulkNode(new Environment(effectiveSettings, null), plugins);
|
||||
Node node = new BulkNode(new Environment(effectiveSettings, null), plugins);
|
||||
try {
|
||||
node.start();
|
||||
return node.client();
|
||||
return node;
|
||||
} catch (NodeValidationException e) {
|
||||
logger.log(Level.ERROR, e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
|
||||
|
@ -22,9 +21,7 @@ class DuplicateIDTest {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
|
||||
private static final Long ACTIONS = 10000L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
|
@ -38,7 +35,6 @@ class DuplicateIDTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getClientSettings())
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
import org.xbib.elx.node.NodeSearchClient;
|
||||
|
@ -29,8 +28,6 @@ class SearchTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
|
@ -44,7 +41,6 @@ class SearchTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getClientSettings())
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
|
|||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
|
@ -154,8 +155,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
.put("node.max_local_storage_nodes", 2)
|
||||
.put("cluster.initial_master_nodes", "1")
|
||||
.put("discovery.seed_hosts", "127.0.0.1:9300")
|
||||
.put("cluster.target_health", "YELLOW")
|
||||
.put("cluster.target_health_timeout", "1m")
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
|
||||
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -21,6 +20,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
|
@ -38,16 +38,16 @@ public class TransportClientHelper {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName());
|
||||
|
||||
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
|
||||
private static final Map<String, ElasticsearchClient> transportClientMap = new HashMap<>();
|
||||
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
|
||||
return transportClientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
|
||||
}
|
||||
|
||||
public void closeClient(Settings settings) {
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
ElasticsearchClient client = clientMap.remove(clusterName);
|
||||
ElasticsearchClient client = transportClientMap.remove(clusterName);
|
||||
if (client != null) {
|
||||
if (client instanceof Client) {
|
||||
((Client) client).close();
|
||||
|
@ -65,9 +65,9 @@ public class TransportClientHelper {
|
|||
}
|
||||
|
||||
private Collection<TransportAddress> findAddresses(Settings settings) {
|
||||
final int defaultPort = settings.getAsInt("port", 9300);
|
||||
final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9300);
|
||||
Collection<TransportAddress> addresses = new ArrayList<>();
|
||||
for (String hostname : settings.getAsList("host")) {
|
||||
for (String hostname : settings.getAsList(Parameters.HOST.getName())) {
|
||||
String[] splitHost = hostname.split(":", 2);
|
||||
if (splitHost.length == 2) {
|
||||
try {
|
||||
|
@ -127,17 +127,15 @@ public class TransportClientHelper {
|
|||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.vm.version")
|
||||
+ " Elasticsearch " + Version.CURRENT.toString();
|
||||
logger.info("creating transport client on {} with custom settings {}",
|
||||
systemIdentifier, Strings.toString(settings));
|
||||
Settings transportClientSettings = getTransportClientSettings(settings);
|
||||
logger.info("creating transport client on {} with settings {}",
|
||||
systemIdentifier, Strings.toString(transportClientSettings));
|
||||
return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class));
|
||||
}
|
||||
|
||||
private Settings getTransportClientSettings(Settings settings) {
|
||||
return Settings.builder()
|
||||
// "cluster.name"
|
||||
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(),
|
||||
settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey()))
|
||||
.put(settings.filter(key -> !isPrivateSettings(key)))
|
||||
// "node.processors"
|
||||
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(),
|
||||
settings.get(EsExecutors.NODE_PROCESSORS_SETTING.getKey(),
|
||||
|
@ -148,6 +146,15 @@ public class TransportClientHelper {
|
|||
.build();
|
||||
}
|
||||
|
||||
private static boolean isPrivateSettings(String key) {
|
||||
for (Parameters p : Parameters.values()) {
|
||||
if (key.equals(p.getName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static class MyTransportClient extends TransportClient {
|
||||
|
||||
MyTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
|
||||
|
|
|
@ -52,7 +52,9 @@ class BulkClientTest {
|
|||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -76,7 +78,9 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -119,7 +123,9 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportSearchClient;
|
||||
import org.xbib.elx.transport.TransportSearchClientProvider;
|
||||
|
||||
|
@ -24,7 +25,7 @@ class DumpIDTest {
|
|||
try (TransportSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
.put("cluster.name", "es2")
|
||||
.put("host", "atlas:9302")
|
||||
.put(Parameters.HOST.getName(), "atlas:9302")
|
||||
.build();
|
||||
BufferedWriter writer = Files.newBufferedWriter(Paths.get("zdb.txt"))) {
|
||||
Stream<String> stream = searchClient.getIds(qb -> qb
|
||||
|
|
|
@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
|
||||
|
@ -22,9 +21,7 @@ class DuplicateIDTest {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
|
||||
private static final Long ACTIONS = 10000L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
|
@ -37,7 +34,6 @@ class DuplicateIDTest {
|
|||
long numactions = ACTIONS;
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(helper.getClientSettings())
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
|
@ -49,7 +45,9 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -51,7 +51,9 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -68,9 +70,11 @@ class SearchTest {
|
|||
TimeValue.timeValueMillis(100), 579);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream docs
|
||||
stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
|
@ -79,9 +83,11 @@ class SearchTest {
|
|||
final AtomicInteger hitcount = new AtomicInteger();
|
||||
stream.forEach(hit -> hitcount.incrementAndGet());
|
||||
assertEquals(numactions, hitcount.get());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream doc ids
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
|
@ -89,11 +95,13 @@ class SearchTest {
|
|||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> idcount.incrementAndGet());
|
||||
assertEquals(numactions, idcount.get());
|
||||
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,8 +66,10 @@ class SmokeTest {
|
|||
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
|
||||
adminClient.updateReplicaLevel(indexDefinition);
|
||||
assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
|
|||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
|
@ -156,10 +157,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.put("cluster.target_health", "YELLOW")
|
||||
.put("cluster.target_health_timeout", "1m")
|
||||
.put("client.transport.nodes_sampler_interval", "1h")
|
||||
.put("client.transport.ping_timeout", "1h")
|
||||
.put(Parameters.HOST.getName(), host)
|
||||
.put(Parameters.PORT.getName(), port)
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
|
||||
//.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
//.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 7.10.2.4
|
||||
version = 7.10.2.5
|
||||
|
||||
gradle.wrapper.version = 6.6.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
|
|
Loading…
Reference in a new issue