align with es7102 branch

2.2.1.47
Jörg Prante 3 years ago
parent 2c33bb08e1
commit 42eb57b2b3

@ -3,15 +3,11 @@ package org.xbib.elx.api;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable { public interface BasicClient extends Closeable {
/**
* Initiative the client
* @param settings settings
*/
void init(Settings settings); void init(Settings settings);
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
@ -29,7 +25,6 @@ public interface BasicClient extends Closeable {
*/ */
ElasticsearchClient getClient(); ElasticsearchClient getClient();
/** /**
* Get cluster name. * Get cluster name.
* @return the cluster name * @return the cluster name
@ -45,14 +40,11 @@ public interface BasicClient extends Closeable {
*/ */
String getHealthColor(long maxWaitTime, TimeUnit timeUnit); String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
/**
* Wait for cluster being healthy.
*/
void waitForHealthyCluster(); void waitForHealthyCluster();
long getSearchableDocs(IndexDefinition indexDefinition); long getSearchableDocs(IndexDefinition indexDefinition);
boolean isIndexExists(IndexDefinition indexDefinition); boolean isIndexExists(IndexDefinition indexDefinition);
ScheduledThreadPoolExecutor getScheduler(); ScheduledExecutorService getScheduler();
} }

@ -4,6 +4,7 @@ import org.elasticsearch.action.ActionRequest;
import java.io.Closeable; import java.io.Closeable;
import java.io.Flushable; import java.io.Flushable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable { public interface BulkProcessor extends Closeable, Flushable {
@ -14,6 +15,10 @@ public interface BulkProcessor extends Closeable, Flushable {
boolean waitForBulkResponses(long timeout, TimeUnit unit); boolean waitForBulkResponses(long timeout, TimeUnit unit);
ScheduledExecutorService getScheduler();
boolean isBulkMetricEnabled();
BulkMetric getBulkMetric(); BulkMetric getBulkMetric();
Throwable getLastBulkError(); Throwable getLastBulkError();

@ -1,6 +1,8 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public interface IndexDefinition { public interface IndexDefinition {
@ -23,7 +25,9 @@ public interface IndexDefinition {
void setMappings(String mappings); void setMappings(String mappings);
String getMappings(); Map<String, Object> getMappings();
Set<String> getMappingFields();
void setDateTimeFormatter(DateTimeFormatter formatter); void setDateTimeFormatter(DateTimeFormatter formatter);
@ -61,7 +65,7 @@ public interface IndexDefinition {
int getShardCount(); int getShardCount();
void setReplicaCount(int replicaLevel); void setReplicaCount(int replicaCount);
int getReplicaCount(); int getReplicaCount();

@ -14,6 +14,8 @@ import java.util.stream.Stream;
public interface SearchClient extends BasicClient { public interface SearchClient extends BasicClient {
boolean isSearchMetricEnabled();
SearchMetric getSearchMetric(); SearchMetric getSearchMetric();
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder); Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder);

@ -119,16 +119,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) { public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return this;
} }
if (indexDefinition.getReplicaCount() < 1) { if (indexDefinition.getReplicaCount() < 0) {
logger.warn("invalid replica level"); logger.warn("invalid replica level defined for index "
+ indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount());
return this; return this;
} }
logger.info("update replica level for " + logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
indexDefinition + " to " + indexDefinition.getReplicaCount()); updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
30L, TimeUnit.SECONDS);
waitForHealthyCluster(); waitForHealthyCluster();
return this; return this;
} }
@ -404,8 +404,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
logger.info("force merge of " + indexDefinition); logger.info("force merge of " + indexDefinition);
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(indexDefinition.getFullIndexName()); forceMergeRequest.indices(indexDefinition.getFullIndexName());
ForceMergeResponse forceMergeResponse = ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet();
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet();
if (forceMergeResponse.getFailedShards() > 0) { if (forceMergeResponse.getFailedShards() > 0) {
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
} }
@ -504,9 +503,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
path = path + fieldName; path = path + fieldName;
} }
if (map.containsKey("index")) { if (map.containsKey("index")) {
String mode = (String) map.get("index"); Object mode = map.get("index");
if ("no".equals(mode)) { if (mode instanceof String) {
return; if ("no".equals(mode)) {
return;
}
}
if (mode instanceof Boolean) {
Boolean b = (Boolean) mode;
if (!b) {
return;
}
} }
} }
for (Map.Entry<String, Object> entry : map.entrySet()) { for (Map.Entry<String, Object> entry : map.entrySet()) {

@ -36,7 +36,7 @@ import org.xbib.elx.api.BasicClient;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -48,21 +48,19 @@ public abstract class AbstractBasicClient implements BasicClient {
protected Settings settings; protected Settings settings;
private final ScheduledThreadPoolExecutor scheduler; private final ScheduledExecutorService executorService;
private final AtomicBoolean closed; private final AtomicBoolean closed;
public AbstractBasicClient() { public AbstractBasicClient() {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2, this.executorService = Executors.newScheduledThreadPool(2,
EsExecutors.daemonThreadFactory("elx-bulk-processor")); new DaemonThreadFactory("elx"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
closed = new AtomicBoolean(false); closed = new AtomicBoolean(false);
} }
@Override @Override
public ScheduledThreadPoolExecutor getScheduler() { public ScheduledExecutorService getScheduler() {
return scheduler; return executorService;
} }
@Override @Override
@ -84,6 +82,17 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
@Override
public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) {
closeClient(settings);
if (executorService != null) {
executorService.shutdown();
}
}
}
@Override @Override
public String getClusterName() { public String getClusterName() {
ensureClientIsPresent(); ensureClientIsPresent();
@ -189,18 +198,6 @@ public abstract class AbstractBasicClient implements BasicClient {
return indicesExistsResponse.isExists(); return indicesExistsResponse.isExists();
} }
@Override
public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) {
closeClient(settings);
if (scheduler != null) {
scheduler.shutdown();
}
}
}
protected abstract ElasticsearchClient createClient(Settings settings); protected abstract ElasticsearchClient createClient(Settings settings);
protected abstract void closeClient(Settings settings) throws IOException; protected abstract void closeClient(Settings settings) throws IOException;

@ -65,7 +65,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
ensureClientIsPresent(); ensureClientIsPresent();
if (bulkProcessor != null) { if (bulkProcessor != null) {
logger.info("closing bulk procesor"); logger.info("closing bulk processor");
bulkProcessor.close(); bulkProcessor.close();
} }
closeClient(settings); closeClient(settings);
@ -100,7 +100,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
createIndexRequestBuilder.setSettings(JsonXContent.contentBuilder() createIndexRequestBuilder.setSettings(JsonXContent.contentBuilder()
.map(settings.getAsStructuredMap()).string()); .map(settings.getAsStructuredMap()).string());
} catch (IOException e) { } catch (IOException e) {
logger.warn(e.getMessage(), e); logger.log(Level.WARN, e.getMessage(), e);
} }
} else { } else {
Settings settings = Settings.builder() Settings settings = Settings.builder()
@ -115,15 +115,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
} }
if (indexDefinition.getMappings() != null) { if (indexDefinition.getMappings() != null) {
try { createIndexRequestBuilder.addMapping(type, indexDefinition.getMappings());
Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
createIndexRequestBuilder.addMapping(type, mappings);
} catch (IOException e) {
logger.log(Level.WARN, e.getMessage(), e);
}
} else { } else {
try { try {
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); XContentBuilder builder = JsonXContent.contentBuilder()
.startObject().startObject(type).endObject().endObject();
createIndexRequestBuilder.addMapping(type, builder); createIndexRequestBuilder.addMapping(type, builder);
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.WARN, e.getMessage(), e); logger.log(Level.WARN, e.getMessage(), e);
@ -144,26 +140,24 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
if (bulkProcessor != null) { ensureClientIsPresent();
ensureClientIsPresent(); Long bulkQueueSize = getThreadPoolQueueSize("bulk");
Long bulkQueueSize = getThreadPoolQueueSize("bulk"); if (bulkQueueSize != null && bulkQueueSize <= 64) {
if (bulkQueueSize != null && bulkQueueSize <= 64) { logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4);
logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); bulkQueueSize = bulkQueueSize * 4;
bulkQueueSize = bulkQueueSize * 4; } else {
} else { logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256");
logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); bulkQueueSize = 256L;
bulkQueueSize = 256L; }
} putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS);
putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); String indexName = indexDefinition.getFullIndexName();
String indexName = indexDefinition.getFullIndexName(); int interval = indexDefinition.getStartBulkRefreshSeconds();
int interval = indexDefinition.getStartBulkRefreshSeconds(); if (interval != 0) {
if (interval != 0) { logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); updateIndexSetting(indexName, "refresh_interval",
updateIndexSetting(indexName, "refresh_interval", interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); } else {
} else { logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
}
} }
} }
@ -177,6 +171,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
String indexName = indexDefinition.getFullIndexName(); String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStopBulkRefreshSeconds(); int interval = indexDefinition.getStopBulkRefreshSeconds();
try { try {
logger.info("flushing bulk");
bulkProcessor.flush(); bulkProcessor.flush();
} catch (IOException e) { } catch (IOException e) {
// can never happen // can never happen
@ -267,7 +262,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
ensureClientIsPresent(); ensureClientIsPresent();
return bulkProcessor.waitForBulkResponses(timeout, timeUnit); return bulkProcessor.waitForBulkResponses(timeout, timeUnit);
} }
return true; return false;
} }
@Override @Override

@ -34,26 +34,24 @@ import java.util.stream.StreamSupport;
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
private SearchMetric searchMetric;
private final AtomicBoolean closed; private final AtomicBoolean closed;
private SearchMetric searchMetric;
public AbstractSearchClient() { public AbstractSearchClient() {
super(); super();
this.closed = new AtomicBoolean(true); this.closed = new AtomicBoolean(true);
} }
@Override
public SearchMetric getSearchMetric() {
return searchMetric;
}
@Override @Override
public void init(Settings settings) { public void init(Settings settings) {
if (closed.compareAndSet(true, false)) { if (closed.compareAndSet(true, false)) {
super.init(settings); super.init(settings);
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(),
searchMetric.init(settings); Parameters.SEARCH_METRIC_ENABLED.getBoolean())) {
this.searchMetric = new DefaultSearchMetric(this, settings);
searchMetric.init(settings);
}
} }
} }
@ -67,20 +65,38 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
} }
} }
@Override
public boolean isSearchMetricEnabled() {
return searchMetric != null;
}
@Override
public SearchMetric getSearchMetric() {
return searchMetric;
}
@Override @Override
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) { public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
getRequestBuilderConsumer.accept(getRequestBuilder); getRequestBuilderConsumer.accept(getRequestBuilder);
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute(); ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); if (searchMetric != null) {
searchMetric.getCurrentQueries().inc();
}
GetResponse getResponse = actionFuture.actionGet(); GetResponse getResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec(); if (searchMetric != null) {
searchMetric.getQueries().inc(); searchMetric.getCurrentQueries().dec();
searchMetric.markTotalQueries(1); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
}
if (getResponse.isExists()) { if (getResponse.isExists()) {
searchMetric.getSucceededQueries().inc(); if (searchMetric != null) {
searchMetric.getSucceededQueries().inc();
}
} else { } else {
searchMetric.getEmptyQueries().inc(); if (searchMetric != null) {
searchMetric.getEmptyQueries().inc();
}
} }
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty();
} }
@ -90,16 +106,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute(); ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); if (searchMetric != null) {
searchMetric.getCurrentQueries().inc();
}
MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); MultiGetResponse multiGetItemResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec(); if (searchMetric != null) {
searchMetric.getQueries().inc(); searchMetric.getCurrentQueries().dec();
searchMetric.markTotalQueries(1); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
}
boolean isempty = multiGetItemResponse.getResponses().length == 0; boolean isempty = multiGetItemResponse.getResponses().length == 0;
if (isempty) { if (isempty) {
searchMetric.getEmptyQueries().inc(); if (searchMetric != null) {
searchMetric.getEmptyQueries().inc();
}
} else { } else {
searchMetric.getSucceededQueries().inc(); if (searchMetric != null) {
searchMetric.getSucceededQueries().inc();
}
} }
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); return isempty ? Optional.empty() : Optional.of(multiGetItemResponse);
} }
@ -109,24 +133,34 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); if (searchMetric != null) {
searchMetric.getCurrentQueries().inc();
}
SearchResponse searchResponse = actionFuture.actionGet(); SearchResponse searchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec(); if (searchMetric != null) {
searchMetric.getQueries().inc(); searchMetric.getCurrentQueries().dec();
searchMetric.markTotalQueries(1); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
}
if (searchResponse.getFailedShards() > 0) { if (searchResponse.getFailedShards() > 0) {
StringBuilder sb = new StringBuilder("Search failed:"); StringBuilder sb = new StringBuilder("Search failed:");
for (ShardSearchFailure failure : searchResponse.getShardFailures()) { for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
sb.append("\n").append(failure.reason()); sb.append("\n").append(failure.reason());
} }
searchMetric.getEmptyQueries().inc(); if (searchMetric != null) {
searchMetric.getEmptyQueries().inc();
}
throw new ElasticsearchException(sb.toString()); throw new ElasticsearchException(sb.toString());
} }
boolean isempty = searchResponse.getHits().getHits().length == 0; boolean isempty = searchResponse.getHits().getHits().length == 0;
if (isempty) { if (isempty) {
searchMetric.getEmptyQueries().inc(); if (searchMetric != null) {
searchMetric.getEmptyQueries().inc();
}
} else { } else {
searchMetric.getSucceededQueries().inc(); if (searchMetric != null) {
searchMetric.getSucceededQueries().inc();
}
} }
return isempty ? Optional.empty() : Optional.of(searchResponse); return isempty ? Optional.empty() : Optional.of(searchResponse);
} }
@ -138,19 +172,31 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); if (searchMetric != null) {
searchMetric.getCurrentQueries().inc();
}
SearchResponse initialSearchResponse = actionFuture.actionGet(); SearchResponse initialSearchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec(); if (searchMetric != null) {
searchMetric.getQueries().inc(); searchMetric.getCurrentQueries().dec();
searchMetric.markTotalQueries(1); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
}
if (initialSearchResponse.getFailedShards() > 0) { if (initialSearchResponse.getFailedShards() > 0) {
searchMetric.getFailedQueries().inc(); if (searchMetric != null) {
searchMetric.getFailedQueries().inc();
}
} else if (initialSearchResponse.isTimedOut()) { } else if (initialSearchResponse.isTimedOut()) {
searchMetric.getTimeoutQueries().inc(); if (searchMetric != null) {
searchMetric.getTimeoutQueries().inc();
}
} else if (initialSearchResponse.getHits().getTotalHits() == 0) { } else if (initialSearchResponse.getHits().getTotalHits() == 0) {
searchMetric.getEmptyQueries().inc(); if (searchMetric != null) {
searchMetric.getEmptyQueries().inc();
}
} else { } else {
searchMetric.getSucceededQueries().inc(); if (searchMetric != null) {
searchMetric.getSucceededQueries().inc();
}
} }
Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse, Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse,
searchResponse -> { searchResponse -> {
@ -159,19 +205,23 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
.setScrollId(searchResponse.getScrollId()) .setScrollId(searchResponse.getScrollId())
.setScroll(scrollTime); .setScroll(scrollTime);
ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); if (searchMetric != null) {
searchMetric.getCurrentQueries().inc();
}
SearchResponse searchResponse1 = actionFuture1.actionGet(); SearchResponse searchResponse1 = actionFuture1.actionGet();
searchMetric.getCurrentQueries().dec(); if (searchMetric != null) {
searchMetric.getQueries().inc(); searchMetric.getCurrentQueries().dec();
searchMetric.markTotalQueries(1); searchMetric.getQueries().inc();
if (searchResponse1.getFailedShards() > 0) { searchMetric.markTotalQueries(1);
searchMetric.getFailedQueries().inc(); if (searchResponse1.getFailedShards() > 0) {
} else if (searchResponse1.isTimedOut()) { searchMetric.getFailedQueries().inc();
searchMetric.getTimeoutQueries().inc(); } else if (searchResponse1.isTimedOut()) {
} else if (searchResponse1.getHits().getHits().length == 0) { searchMetric.getTimeoutQueries().inc();
searchMetric.getEmptyQueries().inc(); } else if (searchResponse1.getHits().getHits().length == 0) {
} else { searchMetric.getEmptyQueries().inc();
searchMetric.getSucceededQueries().inc(); } else {
searchMetric.getSucceededQueries().inc();
}
} }
return searchResponse1; return searchResponse1;
}); });

@ -0,0 +1,26 @@
package org.xbib.elx.common;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class DaemonThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0);
t.setDaemon(true);
return t;
}
}

@ -11,7 +11,7 @@ import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.BulkProcessor;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledExecutorService;
public class DefaultBulkListener implements BulkListener { public class DefaultBulkListener implements BulkListener {
@ -19,26 +19,23 @@ public class DefaultBulkListener implements BulkListener {
private final BulkProcessor bulkProcessor; private final BulkProcessor bulkProcessor;
private final BulkMetric bulkMetric;
private final boolean isBulkLoggingEnabled;
private final boolean failOnError; private final boolean failOnError;
private BulkMetric bulkMetric;
private Throwable lastBulkError; private Throwable lastBulkError;
public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, public DefaultBulkListener(DefaultBulkProcessor bulkProcessor,
ScheduledThreadPoolExecutor scheduler,
Settings settings) { Settings settings) {
this.bulkProcessor = bulkProcessor; this.bulkProcessor = bulkProcessor;
boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(),
Parameters.BULK_LOGGING_ENABLED.getBoolean());
boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
Parameters.BULK_FAIL_ON_ERROR.getBoolean()); Parameters.BULK_FAIL_ON_ERROR.getBoolean());
this.isBulkLoggingEnabled = enableBulkLogging;
this.failOnError = failOnBulkError; this.failOnError = failOnBulkError;
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(),
bulkMetric.start(); Parameters.BULK_METRIC_ENABLED.getBoolean())) {
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings);
bulkMetric.start();
}
} }
public BulkMetric getBulkMetric() { public BulkMetric getBulkMetric() {
@ -47,47 +44,58 @@ public class DefaultBulkListener implements BulkListener {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) { public void beforeBulk(long executionId, BulkRequest request) {
long l = bulkMetric.getCurrentIngest().getCount(); if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(); long l = bulkMetric.getCurrentIngest().getCount();
int n = request.numberOfActions(); bulkMetric.getCurrentIngest().inc();
bulkMetric.getSubmitted().inc(n); int n = request.numberOfActions();
bulkMetric.getCurrentIngestNumDocs().inc(n); bulkMetric.getSubmitted().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); bulkMetric.getCurrentIngestNumDocs().inc(n);
if (isBulkLoggingEnabled && logger.isDebugEnabled()) { bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]", if (logger.isDebugEnabled()) {
executionId, logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]",
request.numberOfActions(), executionId,
request.estimatedSizeInBytes(), request.numberOfActions(),
l); request.estimatedSizeInBytes(),
l);
}
} }
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
bulkMetric.recalculate(request, response); long l = 0L;
long l = bulkMetric.getCurrentIngest().getCount(); if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(); bulkMetric.recalculate(request, response);
bulkMetric.getSucceeded().inc(response.getItems().length); l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.markTotalIngest(response.getItems().length); bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
bulkMetric.markTotalIngest(response.getItems().length);
}
int n = 0; int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) { for (BulkItemResponse itemResponse : response.getItems()) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) { if (itemResponse.isFailed()) {
n++; n++;
bulkMetric.getSucceeded().dec(1); if (bulkMetric != null) {
bulkMetric.getFailed().inc(1); bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
} }
} }
if (isBulkLoggingEnabled && logger.isDebugEnabled()) { if (bulkMetric != null) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]", if (logger.isDebugEnabled()) {
executionId, logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]",
bulkMetric.getSucceeded().getCount(), executionId,
bulkMetric.getFailed().getCount(), bulkMetric.getSucceeded().getCount(),
response.getTook().millis(), bulkMetric.getFailed().getCount(),
l); response.getTook().millis(),
l);
}
} }
if (n > 0) { if (n > 0) {
if (isBulkLoggingEnabled && logger.isErrorEnabled()) { if (logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}", logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage()); executionId, n, response.buildFailureMessage());
} }
@ -96,13 +104,17 @@ public class DefaultBulkListener implements BulkListener {
" n = " + n + " message = " + response.buildFailureMessage()); " n = " + n + " message = " + response.buildFailureMessage());
} }
} else { } else {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
} }
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
bulkMetric.getCurrentIngest().dec(); if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
lastBulkError = failure; lastBulkError = failure;
if (logger.isErrorEnabled()) { if (logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure); logger.error("after bulk [" + executionId + "] error", failure);
@ -117,6 +129,8 @@ public class DefaultBulkListener implements BulkListener {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
bulkMetric.close(); if (bulkMetric != null) {
bulkMetric.close();
}
} }
} }

@ -16,8 +16,8 @@ import org.xbib.metrics.common.Meter;
import java.io.IOException; import java.io.IOException;
import java.util.LongSummaryStatistics; import java.util.LongSummaryStatistics;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class DefaultBulkMetric implements BulkMetric { public class DefaultBulkMetric implements BulkMetric {
@ -63,7 +63,7 @@ public class DefaultBulkMetric implements BulkMetric {
private int x = 0; private int x = 0;
public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor, public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor,
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, ScheduledExecutorService scheduledExecutorService,
Settings settings) { Settings settings) {
this.bulkProcessor = bulkProcessor; this.bulkProcessor = bulkProcessor;
int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(), int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(),
@ -73,7 +73,7 @@ public class DefaultBulkMetric implements BulkMetric {
TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr,
TimeValue.timeValueSeconds(1), ""); TimeValue.timeValueSeconds(1), "");
this.measureIntervalSeconds = measureInterval.seconds(); this.measureIntervalSeconds = measureInterval.seconds();
this.totalIngest = new Meter(scheduledThreadPoolExecutor); this.totalIngest = new Meter(scheduledExecutorService);
this.ringBufferSize = ringBufferSize; this.ringBufferSize = ringBufferSize;
this.ringBuffer = new LongRingBuffer(ringBufferSize); this.ringBuffer = new LongRingBuffer(ringBufferSize);
this.totalIngestSizeInBytes = new CountMetric(); this.totalIngestSizeInBytes = new CountMetric();
@ -93,7 +93,7 @@ public class DefaultBulkMetric implements BulkMetric {
Parameters.BULK_METRIC_LOG_INTERVAL.getString()); Parameters.BULK_METRIC_LOG_INTERVAL.getString());
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
TimeValue.timeValueSeconds(10), ""); TimeValue.timeValueSeconds(10), "");
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); this.future = scheduledExecutorService.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
} }
@Override @Override

@ -16,6 +16,7 @@ import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.BulkProcessor;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,11 +36,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final AtomicBoolean enabled; private final AtomicBoolean enabled;
private final BulkClient bulkClient;
private final ElasticsearchClient client; private final ElasticsearchClient client;
private final DefaultBulkListener bulkListener; private final DefaultBulkListener bulkListener;
private ScheduledFuture<?> scheduledFuture; private ScheduledFuture<?> flushIntervalFuture;
private BulkRequest bulkRequest; private BulkRequest bulkRequest;
@ -56,22 +59,28 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final int permits; private final int permits;
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
this.bulkClient = bulkClient;
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
Parameters.BULK_FLUSH_INTERVAL.getString()); Parameters.BULK_FLUSH_INTERVAL.getString());
TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr, TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr,
TimeValue.timeValueSeconds(30), ""); TimeValue.timeValueSeconds(30), "");
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
this.client = bulkClient.getClient(); this.client = bulkClient.getClient();
if (flushInterval.millis() > 0L) { if (flushInterval.millis() > 0L) {
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), this.flushIntervalFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
flushInterval.millis(), TimeUnit.MILLISECONDS); flushInterval.millis(), TimeUnit.MILLISECONDS);
} }
this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings); this.bulkListener = new DefaultBulkListener(this, settings);
this.bulkActions = maxActionsPerRequest; this.bulkActions = maxActionsPerRequest;
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
this.bulkVolume = minVolumePerRequest.getBytes(); this.bulkVolume = minVolumePerRequest.getBytes();
if (!isBulkMetricEnabled()) {
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(),
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m"));
this.bulkVolume = maxVolumePerRequest.getBytes();
}
this.bulkRequest = new BulkRequest(); this.bulkRequest = new BulkRequest();
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
this.enabled = new AtomicBoolean(false); this.enabled = new AtomicBoolean(false);
@ -112,11 +121,21 @@ public class DefaultBulkProcessor implements BulkProcessor {
return bulkVolume; return bulkVolume;
} }
@Override
public ScheduledExecutorService getScheduler() {
return bulkClient.getScheduler();
}
@Override @Override
public BulkMetric getBulkMetric() { public BulkMetric getBulkMetric() {
return bulkListener.getBulkMetric(); return bulkListener.getBulkMetric();
} }
@Override
public boolean isBulkMetricEnabled() {
return bulkListener.getBulkMetric() != null;
}
@Override @Override
public Throwable getLastBulkError() { public Throwable getLastBulkError() {
return bulkListener.getLastBulkError(); return bulkListener.getLastBulkError();
@ -163,8 +182,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
try { try {
if (scheduledFuture != null) { if (flushIntervalFuture != null) {
scheduledFuture.cancel(true); flushIntervalFuture.cancel(true);
} }
// like flush but without ensuring open // like flush but without ensuring open
if (bulkRequest.numberOfActions() > 0) { if (bulkRequest.numberOfActions() > 0) {

@ -18,6 +18,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DefaultIndexDefinition implements IndexDefinition { public class DefaultIndexDefinition implements IndexDefinition {
@ -90,26 +91,26 @@ public class DefaultIndexDefinition implements IndexDefinition {
if (settings.get("settings") != null && settings.get("mapping") != null) { if (settings.get("settings") != null && settings.get("mapping") != null) {
setSettings(findSettingsFrom(settings.get("settings"))); setSettings(findSettingsFrom(settings.get("settings")));
setMappings(findMappingsFrom(settings.get("mapping"))); setMappings(findMappingsFrom(settings.get("mapping")));
boolean shift = settings.getAsBoolean("shift", false); }
setShift(shift); boolean shift = settings.getAsBoolean("shift", false);
if (shift) { setShift(shift);
String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(), if (shift) {
Parameters.DATE_TIME_FORMAT.getString()); String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(),
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) Parameters.DATE_TIME_FORMAT.getString());
.withZone(ZoneId.systemDefault()); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault())
setDateTimeFormatter(dateTimeFormatter); .withZone(ZoneId.systemDefault());
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$"); setDateTimeFormatter(dateTimeFormatter);
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$");
setDateTimePattern(dateTimePattern); Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); setDateTimePattern(dateTimePattern);
fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
setFullIndexName(fullIndexName); fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName);
boolean prune = settings.getAsBoolean("prune", false); setFullIndexName(fullIndexName);
setPrune(prune); boolean prune = settings.getAsBoolean("prune", false);
if (prune) { setPrune(prune);
setMinToKeep(settings.getAsInt("retention.mintokeep", 2)); if (prune) {
setDelta(settings.getAsInt("retention.delta", 2)); setMinToKeep(settings.getAsInt("retention.mintokeep", 2));
} setDelta(settings.getAsInt("retention.delta", 2));
} }
} }
} }
@ -134,7 +135,6 @@ public class DefaultIndexDefinition implements IndexDefinition {
return type; return type;
} }
@Override @Override
public void setFullIndexName(String fullIndexName) { public void setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName; this.fullIndexName = fullIndexName;
@ -161,8 +161,23 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public String getMappings() { public Map<String, Object> getMappings() {
return mappings; if (mappings == null) {
return null;
}
try {
return JsonXContent.jsonXContent.createParser(mappings).mapOrdered();
} catch (IOException e) {
return null;
}
}
@Override
public Set<String> getMappingFields() {
if (mappings == null) {
return null;
}
return Settings.builder().loadFromSource(mappings).build().getGroups("properties").keySet();
} }
@Override @Override

@ -4,13 +4,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.SearchClient;
import org.xbib.elx.api.SearchMetric; import org.xbib.elx.api.SearchMetric;
import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered; import org.xbib.metrics.api.Metered;
import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.CountMetric;
import org.xbib.metrics.common.Meter; import org.xbib.metrics.common.Meter;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class DefaultSearchMetric implements SearchMetric { public class DefaultSearchMetric implements SearchMetric {
@ -37,9 +37,9 @@ public class DefaultSearchMetric implements SearchMetric {
private Long stopped; private Long stopped;
public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, public DefaultSearchMetric(SearchClient searchClient,
Settings settings) { Settings settings) {
totalQuery = new Meter(scheduledThreadPoolExecutor); totalQuery = new Meter(searchClient.getScheduler());
currentQuery = new CountMetric(); currentQuery = new CountMetric();
queries = new CountMetric(); queries = new CountMetric();
succeededQueries = new CountMetric(); succeededQueries = new CountMetric();
@ -50,7 +50,7 @@ public class DefaultSearchMetric implements SearchMetric {
Parameters.SEARCH_METRIC_LOG_INTERVAL.getString()); Parameters.SEARCH_METRIC_LOG_INTERVAL.getString());
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
TimeValue.timeValueSeconds(10), ""); TimeValue.timeValueSeconds(10), "");
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); this.future = searchClient.getScheduler().scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
} }
@Override @Override
@ -126,10 +126,10 @@ public class DefaultSearchMetric implements SearchMetric {
private void log() { private void log() {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("queries = " + getTotalQueries().getCount() + logger.info("queries = " + getTotalQueries().getCount() +
" succeeded = " + getSucceededQueries().getCount() + " succeeded = " + getSucceededQueries().getCount() +
" empty = " + getEmptyQueries().getCount() + " empty = " + getEmptyQueries().getCount() +
" failed = " + getFailedQueries() + " failed = " + getFailedQueries().getCount() +
" timeouts = " + getTimeoutQueries().getCount()); " timeouts = " + getTimeoutQueries().getCount());
} }
} }
} }

@ -2,20 +2,20 @@ package org.xbib.elx.common;
public enum Parameters { public enum Parameters {
HOST("host", String.class, "localhost"),
PORT("port", Integer.class, 9300),
CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"), CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"),
CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"), CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"),
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),
BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1),
BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true),
BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true), BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true),
BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
@ -26,15 +26,19 @@ public enum Parameters {
BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.TRUE),
BULK_METRIC_LOG_INTERVAL("bulk.metric.log_interval", String.class, "10s"),
BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1), BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s"); SEARCH_METRIC_ENABLED("search.metric.enabled", Boolean.class, Boolean.FALSE),
SEARCH_METRIC_LOG_INTERVAL("search.metric.log_interval", String.class, "10s");
private final String name; private final String name;

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractAdminClient; import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
public class NodeAdminClient extends AbstractAdminClient { public class NodeAdminClient extends AbstractAdminClient {
@ -20,7 +19,7 @@ public class NodeAdminClient extends AbstractAdminClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

@ -9,7 +9,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.xbib.elx.common.Parameters; import org.xbib.elx.common.Parameters;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -21,9 +20,7 @@ public class NodeClientHelper {
private static Object configurationObject; private static Object configurationObject;
private static Node node; private static final Map<String, Node> nodeMap = new HashMap<>();
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
public ElasticsearchClient createClient(Settings settings, Object object) { public ElasticsearchClient createClient(Settings settings, Object object) {
if (configurationObject == null) { if (configurationObject == null) {
@ -32,20 +29,21 @@ public class NodeClientHelper {
if (configurationObject instanceof ElasticsearchClient) { if (configurationObject instanceof ElasticsearchClient) {
return (ElasticsearchClient) configurationObject; return (ElasticsearchClient) configurationObject;
} }
return clientMap.computeIfAbsent(settings.get("cluster.name"), String clusterName = settings.get("cluster.name", "elasticsearch");
key -> innerCreateClient(settings)); Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
return node != null ? node.client() : null;
} }
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); String clusterName = settings.get("cluster.name", "elasticsearch");
if (client != null) { Node node = nodeMap.remove(settings.get("cluster.name"));
if (node != null) {
logger.debug("closing node..."); logger.debug("closing node...");
node.close(); node.close();
node = null;
} }
} }
private ElasticsearchClient innerCreateClient(Settings settings) { private Node innerCreateClient(Settings settings) {
String version = System.getProperty("os.name") String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.vendor")
@ -61,9 +59,9 @@ public class NodeClientHelper {
logger.info("creating node client on {} with effective settings {}", logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.toDelimitedString(',')); version, effectiveSettings.toDelimitedString(','));
Collection<Class<? extends Plugin>> plugins = Collections.emptyList(); Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
node = new BulkNode(new Environment(effectiveSettings), plugins); Node node = new BulkNode(new Environment(effectiveSettings), plugins);
node.start(); node.start();
return node.client(); return node;
} }
private static Settings filterSettings(Settings settings) { private static Settings filterSettings(Settings settings) {

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractSearchClient; import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
public class NodeSearchClient extends AbstractSearchClient { public class NodeSearchClient extends AbstractSearchClient {
@ -20,7 +19,7 @@ public class NodeSearchClient extends AbstractSearchClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

@ -52,7 +52,9 @@ class BulkClientTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -76,7 +78,9 @@ class BulkClientTest {
} }
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -119,7 +123,9 @@ class BulkClientTest {
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }

@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeBulkClientProvider;
@ -22,9 +21,7 @@ class DuplicateIDTest {
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 10000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -38,7 +35,6 @@ class DuplicateIDTest {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -49,7 +45,9 @@ class DuplicateIDTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }

@ -62,9 +62,9 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
indexDefinition.setEnabled(true);
indexDefinition.setDelta(2); indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2); indexDefinition.setMinToKeep(2);
indexDefinition.setEnabled(true);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);

@ -42,7 +42,7 @@ class IndexShiftTest {
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");

@ -52,7 +52,9 @@ class SearchTest {
assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS));
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -69,9 +71,11 @@ class SearchTest {
TimeValue.timeValueMillis(100), 579); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions, count); assertEquals(numactions, count);
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
// test stream docs // test stream docs
stream = searchClient.search(qb -> qb stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
@ -80,9 +84,11 @@ class SearchTest {
final AtomicInteger hitcount = new AtomicInteger(); final AtomicInteger hitcount = new AtomicInteger();
stream.forEach(hit -> hitcount.incrementAndGet()); stream.forEach(hit -> hitcount.incrementAndGet());
assertEquals(numactions, hitcount.get()); assertEquals(numactions, hitcount.get());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
// test stream doc ids // test stream doc ids
Stream<String> ids = searchClient.getIds(qb -> qb Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
@ -90,11 +96,13 @@ class SearchTest {
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> idcount.incrementAndGet()); ids.forEach(id -> idcount.incrementAndGet());
assertEquals(numactions, idcount.get()); assertEquals(numactions, idcount.get());
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
}
} }
} }
} }

@ -17,9 +17,9 @@ import org.xbib.elx.node.NodeBulkClientProvider;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(TestExtension.class) @ExtendWith(TestExtension.class)
class SmokeTest { class SmokeTest {
@ -51,22 +51,24 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush(); bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.checkMapping(indexDefinition); adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition); adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }

@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver; import org.junit.jupiter.api.extension.ParameterResolver;
import org.xbib.elx.common.Parameters;
import java.io.IOException; import java.io.IOException;
import java.nio.file.FileVisitResult; import java.nio.file.FileVisitResult;
@ -31,6 +32,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final Logger logger = LogManager.getLogger("test"); private static final Logger logger = LogManager.getLogger("test");
private static final Random random = new Random();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final String key = "es-instance-"; private static final String key = "es-instance-";
private static final AtomicInteger count = new AtomicInteger(0); private static final AtomicInteger count = new AtomicInteger(0);
@ -52,7 +57,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
@Override @Override
public void beforeEach(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) {
Helper helper = extensionContext.getParent().isPresent() ? Helper helper = extensionContext.getParent().isPresent() ?
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
Objects.requireNonNull(helper); Objects.requireNonNull(helper);
@ -135,8 +140,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put("node.client", true) .put("node.client", true)
.put("node.master", false) .put("node.master", false)
.put("node.data", false) .put("node.data", false)
.put("cluster.target_health", "YELLOW") .put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
.put("cluster.target_health_timeout", "1m") .put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
.build(); .build();
} }
@ -183,14 +190,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
void closeNodes() { void closeNodes() {
if (node != null) { if (node != null) {
logger.info("closing all nodes"); logger.info("closing node");
node.client().close(); node.client().close();
node.close(); node.close();
} }
} }
private static final Random random = new Random();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
} }
} }

@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.jboss.netty.channel.DefaultChannelFuture; import org.jboss.netty.channel.DefaultChannelFuture;
import org.xbib.elx.common.Parameters;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -23,21 +24,22 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate;
public class TransportClientHelper { public class TransportClientHelper {
private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName());
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>(); private static final Map<String, ElasticsearchClient> transportClientMap = new HashMap<>();
public ElasticsearchClient createClient(Settings settings) { public ElasticsearchClient createClient(Settings settings) {
String clusterName = settings.get("cluster.name", "elasticsearch"); String clusterName = settings.get("cluster.name", "elasticsearch");
return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); return transportClientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
} }
public void closeClient(Settings settings) { public void closeClient(Settings settings) {
String clusterName = settings.get("cluster.name", "elasticsearch"); String clusterName = settings.get("cluster.name", "elasticsearch");
ElasticsearchClient client = clientMap.remove(clusterName); ElasticsearchClient client = transportClientMap.remove(clusterName);
if (client != null) { if (client != null) {
if (client instanceof Client) { if (client instanceof Client) {
((Client) client).close(); ((Client) client).close();
@ -55,31 +57,29 @@ public class TransportClientHelper {
} }
private Collection<TransportAddress> findAddresses(Settings settings) { private Collection<TransportAddress> findAddresses(Settings settings) {
final int defaultPort = settings.getAsInt("port", 9300); final int defaultPort = settings.getAsInt(Parameters.PORT.getName(), 9300);
Collection<TransportAddress> addresses = new ArrayList<>(); Collection<TransportAddress> addresses = new ArrayList<>();
for (String hostname : settings.getAsArray("host")) { for (String hostname : settings.getAsArray(Parameters.HOST.getName())) {
String[] splitHost = hostname.split(":", 2); String[] splitHost = hostname.split(":", 2);
if (splitHost.length == 2) { try {
try { if (splitHost.length == 2) {
String host = splitHost[0]; InetAddress inetAddress =
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); NetworkUtils.resolveInetAddress(splitHost[0], null);
int port = Integer.parseInt(splitHost[1]); TransportAddress address =
TransportAddress address = new InetSocketTransportAddress(inetAddress, port); new InetSocketTransportAddress(inetAddress, Integer.parseInt(splitHost[1]));
addresses.add(address); addresses.add(address);
} catch (IOException e) { } else if (splitHost.length == 1) {
logger.warn(e.getMessage(), e); InetAddress inetAddress =
} NetworkUtils.resolveInetAddress(splitHost[0], null);
} else if (splitHost.length == 1) { TransportAddress address =
try { new InetSocketTransportAddress(inetAddress, defaultPort);
String host = splitHost[0];
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
addresses.add(address); addresses.add(address);
} catch (IOException e) { } else {
logger.warn(e.getMessage(), e); throw new IllegalArgumentException("invalid hostname specification: " + hostname);
} }
} else { }
throw new IllegalArgumentException("invalid hostname specification: " + hostname); catch (IOException e) {
logger.warn(e.getMessage(), e);
} }
} }
return addresses; return addresses;
@ -114,24 +114,39 @@ public class TransportClientHelper {
+ " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version") + " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString(); + " Elasticsearch " + Version.CURRENT.toString();
logger.info("creating transport client on {} with custom settings {}", Settings transportClientSettings = getTransportClientSettings(settings);
systemIdentifier, settings.getAsMap()); logger.info("creating transport client on {} with settings {}",
systemIdentifier, transportClientSettings.getAsMap());
// we need to disable dead lock check because we may have mixed node/transport clients // we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false); DefaultChannelFuture.setUseDeadLockChecker(false);
return TransportClient.builder() return TransportClient.builder()
.settings(getTransportClientSettings(settings)) .settings(transportClientSettings)
.build(); .build();
} }
private Settings getTransportClientSettings(Settings settings) { private Settings getTransportClientSettings(Settings settings) {
return Settings.builder() return Settings.builder()
.put("cluster.name", settings.get("cluster.name", "elasticsearch")) .put(filter(settings, key -> !isPrivateSettings(key)))
.put("path.home", settings.get("path.home", ".")) .put("path.home", settings.get("path.home", "."))
.put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) // for thread pool size / worker count
.put("client.transport.sniff", settings.getAsBoolean("client.transport.sniff", false)) // always disable sniff
.put("client.transport.nodes_sampler_interval", settings.get("client.transport.nodes_sampler_interval", "10000s")) // ridculous long ping, default is 5 seconds
.put("client.transport.ping_timeout", settings.get("client.transport.ping_timeout", "10000s")) // ridiculous ping for unresponsive nodes, defauult is 5 seconds
.put("client.transport.ignore_cluster_name", settings.getAsBoolean("client.transport.ignore_cluster_name", true)) // connect to any cluster
.build(); .build();
} }
private static Settings filter(Settings settings, Predicate<String> predicate) {
Settings.Builder builder = Settings.settingsBuilder();
for (Map.Entry<String, String> me : settings.getAsMap().entrySet()) {
if (predicate.test(me.getKey())) {
builder.put(me.getKey(), me.getValue());
}
}
return builder.build();
}
private static boolean isPrivateSettings(String key) {
for (Parameters p : Parameters.values()) {
if (key.equals(p.getName())) {
return true;
}
}
return false;
}
} }

@ -52,7 +52,9 @@ class BulkClientTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -69,12 +71,15 @@ class BulkClientTest {
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition);
for (int i = 0; i < ACTIONS; i++) { for (int i = 0; i < ACTIONS; i++) {
bulkClient.index(indexDefinition, null, false, bulkClient.index(indexDefinition, null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.stopBulk(indexDefinition);
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -118,7 +123,9 @@ class BulkClientTest {
logger.error("latch timeout!"); logger.error("latch timeout!");
} }
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {

@ -19,7 +19,7 @@ class DuplicateIDTest {
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 10000L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -43,7 +43,9 @@ class DuplicateIDTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }

@ -41,9 +41,9 @@ class IndexPruneTest {
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune1"); indexDefinition.setFullIndexName("test_prune1");
@ -62,10 +62,10 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
indexDefinition.setEnabled(true);
indexDefinition.setDelta(2); indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2); indexDefinition.setMinToKeep(2);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
indexDefinition.setEnabled(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);
assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1"));

@ -43,10 +43,10 @@ class IndexShiftTest {
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {

@ -54,7 +54,9 @@ class SearchTest {
assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS));
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
@ -71,9 +73,11 @@ class SearchTest {
TimeValue.timeValueMillis(100), 579); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions, count); assertEquals(numactions, count);
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
// test stream docs // test stream docs
stream = searchClient.search(qb -> qb stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
@ -82,9 +86,11 @@ class SearchTest {
final AtomicInteger hitcount = new AtomicInteger(); final AtomicInteger hitcount = new AtomicInteger();
stream.forEach(hit -> hitcount.incrementAndGet()); stream.forEach(hit -> hitcount.incrementAndGet());
assertEquals(numactions, hitcount.get()); assertEquals(numactions, hitcount.get());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
// test stream doc ids // test stream doc ids
Stream<String> ids = searchClient.getIds(qb -> qb Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
@ -92,11 +98,13 @@ class SearchTest {
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> idcount.incrementAndGet()); ids.forEach(id -> idcount.incrementAndGet());
assertEquals(numactions, idcount.get()); assertEquals(numactions, idcount.get());
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); if (searchClient.isSearchMetricEnabled()) {
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
}
} }
} }
} }

@ -49,22 +49,24 @@ class SmokeTest {
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.checkMapping(indexDefinition); adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition); adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }

@ -14,6 +14,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver; import org.junit.jupiter.api.extension.ParameterResolver;
import org.xbib.elx.common.Parameters;
import java.io.IOException; import java.io.IOException;
import java.nio.file.FileVisitResult; import java.nio.file.FileVisitResult;
@ -31,6 +32,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final Logger logger = LogManager.getLogger("test"); private static final Logger logger = LogManager.getLogger("test");
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final Random random = new SecureRandom();
private static final String key = "es-instance-"; private static final String key = "es-instance-";
private static final AtomicInteger count = new AtomicInteger(0); private static final AtomicInteger count = new AtomicInteger(0);
@ -52,7 +57,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
@Override @Override
public void beforeEach(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) {
Helper helper = extensionContext.getParent().isPresent() ? Helper helper = extensionContext.getParent().isPresent() ?
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
Objects.requireNonNull(helper); Objects.requireNonNull(helper);
@ -129,10 +134,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder() return Settings.builder()
.put("cluster.name", cluster) .put("cluster.name", cluster)
.put("path.home", getHome()) .put("path.home", getHome())
.put("host", host) .put("client.transport.nodes_sampler_interval", "1h")
.put("port", port) .put("client.transport.ping_timeout", "1h")
.put("cluster.target_health", "YELLOW") .put(Parameters.HOST.getName(), host)
.put("cluster.target_health_timeout", "1m") .put(Parameters.PORT.getName(), port)
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
.build(); .build();
} }
@ -179,9 +188,5 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
node.close(); node.close();
} }
} }
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final Random random = new SecureRandom();
} }
} }

@ -1,6 +1,6 @@
group = org.xbib group = org.xbib
name = elx name = elx
version = 2.2.1.46 version = 2.2.1.47
gradle.wrapper.version = 6.6.1 gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0 xbib-metrics.version = 2.1.0

Loading…
Cancel
Save