align with es7102

Jörg Prante 3 years ago
parent 45865d656d
commit 6d434a98c6

@ -47,7 +47,7 @@ public interface AdminClient extends BasicClient {
* Resolve alias. * Resolve alias.
* *
* @param alias the alias * @param alias the alias
* @return this index name behind the alias or the alias if there is no index * @return the index names in ordered sequence behind the alias or an empty list if there is no such alias
*/ */
List<String> resolveAlias(String alias); List<String> resolveAlias(String alias);
@ -89,7 +89,7 @@ public interface AdminClient extends BasicClient {
/** /**
* Find the timestamp of the most recently indexed document in the index. * Find the timestamp of the most recently indexed document in the index.
* *
* @param indexDefinition the index * @param indexDefinition the index definition
* @param timestampfieldname the timestamp field name * @param timestampfieldname the timestamp field name
* @return millis UTC millis of the most recent document * @return millis UTC millis of the most recent document
* @throws IOException if most rcent document can not be found * @throws IOException if most rcent document can not be found

@ -52,16 +52,16 @@ public interface BasicClient extends Closeable {
* Wait for cluster being healthy. * Wait for cluster being healthy.
* *
* @param healthColor cluster health color to wait for * @param healthColor cluster health color to wait for
* @param maxWaitTime time value * @param maxWaitTime time value
* @param timeUnit time unit * @param timeUnit time unit
*/ */
void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
void waitForShards(long maxWaitTime, TimeUnit timeUnit); void waitForShards(long maxWaitTime, TimeUnit timeUnit);
long getSearchableDocs(IndexDefinition index); long getSearchableDocs(IndexDefinition indexDefinition);
boolean isIndexExists(IndexDefinition index); boolean isIndexExists(IndexDefinition indexDefinition);
ScheduledThreadPoolExecutor getScheduler(); ScheduledThreadPoolExecutor getScheduler();
} }

@ -36,8 +36,8 @@ public interface BulkClient extends BasicClient, Flushable {
* Add index request. Each request will be added to a queue for bulking requests. * Add index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded. * Submitting request will be done when limits are exceeded.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param create true if document must be created * @param create true if document must be created
* @param source the source * @param source the source
* @return this * @return this
@ -48,8 +48,8 @@ public interface BulkClient extends BasicClient, Flushable {
* Index request. Each request will be added to a queue for bulking requests. * Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded. * Submitting request will be done when limits are exceeded.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param create true if document is to be created, false otherwise * @param create true if document is to be created, false otherwise
* @param source the source * @param source the source
* @return this client methods * @return this client methods
@ -88,7 +88,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Submitting request will be done when bulk limits are exceeded. * Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized. * Note that updates only work correctly when all operations between nodes are synchronized.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
@ -149,5 +149,5 @@ public interface BulkClient extends BasicClient, Flushable {
*/ */
void flushIndex(IndexDefinition indexDefinition); void flushIndex(IndexDefinition indexDefinition);
BulkProcessor getBulkController(); BulkProcessor getBulkProcessor();
} }

@ -5,7 +5,6 @@ import org.elasticsearch.action.ActionRequest;
import java.io.Closeable; import java.io.Closeable;
import java.io.Flushable; import java.io.Flushable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable { public interface BulkProcessor extends Closeable, Flushable {

@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
@ -54,6 +55,7 @@ import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexShiftResult; import org.xbib.elx.api.IndexShiftResult;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -67,9 +69,7 @@ import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
@Override @Override
public Map<String, ?> getMapping(IndexDefinition indexDefinition) throws IOException { public Map<String, ?> getMapping(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return null;
} }
@ -89,10 +89,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setTypes(indexDefinition.getType()); .setTypes(indexDefinition.getType());
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
return getMappingsResponse.getMappings() try {
.get(indexDefinition.getFullIndexName()) return getMappingsResponse.getMappings()
.get(indexDefinition.getType()) .get(indexDefinition.getFullIndexName())
.getSourceAsMap(); .get(indexDefinition.getType())
.getSourceAsMap();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} }
@Override @Override
@ -206,17 +210,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return new EmptyIndexShiftResult(); return new EmptyIndexShiftResult();
} }
if (indexDefinition.isShiftEnabled()) { if (indexDefinition.isShiftEnabled()) {
return shiftIndex(indexDefinition.getIndex(), return shiftIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(),
indexDefinition.getFullIndexName(), additionalAliases.stream() additionalAliases.stream()
.filter(a -> a != null && !a.isEmpty()) .filter(a -> a != null && !a.isEmpty())
.collect(Collectors.toList()), indexAliasAdder); .collect(Collectors.toList()), indexAliasAdder);
} }
return new EmptyIndexShiftResult(); return new EmptyIndexShiftResult();
} }
private IndexShiftResult shiftIndex(String index, String fullIndexName, private IndexShiftResult shiftIndex(String index,
List<String> additionalAliases, String fullIndexName,
IndexAliasAdder adder) { List<String> additionalAliases,
IndexAliasAdder adder) {
ensureClientIsPresent(); ensureClientIsPresent();
if (index == null) { if (index == null) {
return new EmptyIndexShiftResult(); // nothing to shift to return new EmptyIndexShiftResult(); // nothing to shift to
@ -289,7 +294,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() ? return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() &&
indexDefinition.getRetention() != null &&
indexDefinition.getDateTimePattern() != null ?
pruneIndex(indexDefinition.getIndex(), pruneIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), indexDefinition.getFullIndexName(),
indexDefinition.getDateTimePattern(), indexDefinition.getDateTimePattern(),
@ -315,7 +322,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
ensureClientIsPresent(); ensureClientIsPresent();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
logger.info("before pruning: protected = " + protectedIndexName + " found total of {} indices", getIndexResponse.getIndices().length); logger.info("before pruning: found total of {} indices", getIndexResponse.getIndices().length);
List<String> candidateIndices = new ArrayList<>(); List<String> candidateIndices = new ArrayList<>();
for (String s : getIndexResponse.getIndices()) { for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s); Matcher m = pattern.matcher(s);
@ -372,7 +379,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexDefinition.getFullIndexName()); searchRequest.indices(indexDefinition.getFullIndexName());
searchRequest.source(builder); searchRequest.source(builder);
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
if (searchResponse.getHits().getHits().length == 1) { if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0]; SearchHit hit = searchResponse.getHits().getHits()[0];
if (hit.getFields().get(timestampfieldname) != null) { if (hit.getFields().get(timestampfieldname) != null) {
@ -394,22 +402,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return false; return false;
} }
ensureClientIsPresent(); ensureClientIsPresent();
String index = indexDefinition.getFullIndexName(); logger.info("force merge of " + indexDefinition);
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(index); forceMergeRequest.indices(indexDefinition.getFullIndexName());
try { ForceMergeResponse forceMergeResponse =
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet();
.get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); if (forceMergeResponse.getFailedShards() > 0) {
return true; throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
} catch (TimeoutException e) { }
logger.error("timeout"); waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
} catch (ExecutionException e) { return true;
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(e.getMessage(), e);
}
return false;
} }
@Override @Override
@ -467,7 +469,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
.setTypes(type) .setTypes(type)
.setQuery(QueryBuilders.matchAllQuery()) .setQuery(QueryBuilders.matchAllQuery())
.setSize(0); .setSize(0);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchResponse searchResponse =
searchRequestBuilder.execute().actionGet();
long total = searchResponse.getHits().getTotalHits(); long total = searchResponse.getHits().getTotalHits();
if (total > 0L) { if (total > 0L) {
Map<String, Long> fields = new TreeMap<>(); Map<String, Long> fields = new TreeMap<>();

@ -116,7 +116,7 @@ public abstract class AbstractBasicClient implements BasicClient {
Settings.Builder updateSettingsBuilder = Settings.builder(); Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
} }
@ -140,6 +140,7 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override @Override
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit);
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
@ -165,7 +166,7 @@ public abstract class AbstractBasicClient implements BasicClient {
ClusterHealthResponse healthResponse = ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) { if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards"; String message = "timeout waiting for cluster shards: " + timeout;
logger.error(message); logger.error(message);
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }
@ -257,8 +258,8 @@ public abstract class AbstractBasicClient implements BasicClient {
protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) { protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) {
if (!indexDefinition.isEnabled()) { if (!indexDefinition.isEnabled()) {
logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled");
return true; return true;
} }
return false; return false;
} }

@ -48,7 +48,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public BulkProcessor getBulkController() { public BulkProcessor getBulkProcessor() {
return bulkProcessor; return bulkProcessor;
} }
@ -100,14 +100,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build();
createIndexRequestBuilder.setSettings(settings); createIndexRequestBuilder.setSettings(settings);
// must be Map<String, Object> to match prototype of addMapping()! if (indexDefinition.getMappings() != null) {
Map<String, Object> mappings = indexDefinition.getMappings() == null ? null : Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
if (mappings != null) {
createIndexRequestBuilder.addMapping(type, mappings); createIndexRequestBuilder.addMapping(type, mappings);
} else { } else {
createIndexRequestBuilder.addMapping(type, XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject();
JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject()); createIndexRequestBuilder.addMapping(type, builder);
} }
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
if (createIndexResponse.isAcknowledged()) { if (createIndexResponse.isAcknowledged()) {
@ -117,8 +115,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
return; return;
} }
// we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly. // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
logger.info("waiting for GREEN after index {} was created", index); waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
} }
@Override @Override
@ -126,17 +123,19 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
ensureClientIsPresent(); if (bulkProcessor != null) {
Long bulkQueueSize = getThreadPoolQueueSize("bulk"); ensureClientIsPresent();
if (bulkQueueSize != null && bulkQueueSize <= 64) { Long bulkQueueSize = getThreadPoolQueueSize("bulk");
logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); if (bulkQueueSize != null && bulkQueueSize <= 64) {
bulkQueueSize = bulkQueueSize * 4; logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4);
} else { bulkQueueSize = bulkQueueSize * 4;
logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); } else {
bulkQueueSize = 256L; logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256");
bulkQueueSize = 256L;
}
putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS);
bulkProcessor.startBulkMode(indexDefinition);
} }
putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS);
bulkProcessor.startBulkMode(indexDefinition);
} }
@Override @Override
@ -144,8 +143,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
ensureClientIsPresent(); if (bulkProcessor != null) {
bulkProcessor.stopBulkMode(indexDefinition); ensureClientIsPresent();
bulkProcessor.stopBulkMode(indexDefinition);
}
} }
@Override @Override
@ -166,8 +167,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public BulkClient index(IndexRequest indexRequest) { public BulkClient index(IndexRequest indexRequest) {
ensureClientIsPresent(); if (bulkProcessor != null) {
bulkProcessor.add(indexRequest); ensureClientIsPresent();
bulkProcessor.add(indexRequest);
}
return this; return this;
} }
@ -184,8 +187,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public BulkClient delete(DeleteRequest deleteRequest) { public BulkClient delete(DeleteRequest deleteRequest) {
ensureClientIsPresent(); if (bulkProcessor != null) {
bulkProcessor.add(deleteRequest); ensureClientIsPresent();
bulkProcessor.add(deleteRequest);
}
return this; return this;
} }
@ -214,8 +219,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) { public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
ensureClientIsPresent(); if (bulkProcessor != null) {
return bulkProcessor.waitForBulkResponses(timeout, timeUnit); ensureClientIsPresent();
return bulkProcessor.waitForBulkResponses(timeout, timeUnit);
}
return true;
} }
@Override @Override

@ -60,10 +60,10 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
super.close();
if (searchMetric != null) { if (searchMetric != null) {
searchMetric.close(); searchMetric.close();
} }
super.close();
} }
} }
@ -137,9 +137,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
SearchResponse initialSearchResponse = searchRequestBuilder.execute().actionGet(); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
SearchResponse initialSearchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
if (initialSearchResponse.getFailedShards() > 0) {
searchMetric.getFailedQueries().inc();
} else if (initialSearchResponse.isTimedOut()) {
searchMetric.getTimeoutQueries().inc();
} else if (initialSearchResponse.getHits().getTotalHits() == 0) {
searchMetric.getEmptyQueries().inc();
} else {
searchMetric.getSucceededQueries().inc();
}
Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse, Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse,
searchResponse -> { searchResponse -> {
SearchScrollRequestBuilder searchScrollRequestBuilder = SearchScrollRequestBuilder searchScrollRequestBuilder =
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
.setScrollId(searchResponse.getScrollId()) .setScrollId(searchResponse.getScrollId())

@ -68,6 +68,7 @@ public class DefaultBulkListener implements BulkListener {
long l = bulkMetric.getCurrentIngest().getCount(); long l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec(); bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length); bulkMetric.getSucceeded().inc(response.getItems().length);
bulkMetric.markTotalIngest(response.getItems().length);
int n = 0; int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) { for (BulkItemResponse itemResponse : response.getItems()) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());

@ -102,7 +102,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
int interval = indexDefinition.getStartBulkRefreshSeconds(); int interval = indexDefinition.getStartBulkRefreshSeconds();
if (interval != 0) { if (interval != 0) {
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else { } else {
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
} }
@ -116,7 +117,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
if (interval != 0) { if (interval != 0) {
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else { } else {
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
} }
@ -134,8 +136,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
} }
@Override @Override
public void setMaxBulkVolume(long bulkSize) { public void setMaxBulkVolume(long bulkVolume) {
this.bulkVolume = bulkSize; this.bulkVolume = bulkVolume;
} }
@Override @Override

@ -43,13 +43,13 @@ class AliasTest {
@Test @Test
void testAlias() { void testAlias() {
ElasticsearchClient client = helper.client("1"); ElasticsearchClient client = helper.client();
CreateIndexRequest indexRequest = new CreateIndexRequest("test_index"); CreateIndexRequest indexRequest = new CreateIndexRequest("test_index");
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet(); client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test_index"}; String[] indices = { "test_index" };
String[] aliases = new String[]{"test_alias"}; String[] aliases = { "test_alias" };
IndicesAliasesRequest.AliasActions aliasAction = IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction); indicesAliasesRequest.addAliasAction(aliasAction);
@ -67,7 +67,7 @@ class AliasTest {
@Test @Test
void testMostRecentIndex() { void testMostRecentIndex() {
ElasticsearchClient client = helper.client("1"); ElasticsearchClient client = helper.client();
String alias = "test"; String alias = "test";
CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101"); CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101");
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
@ -76,7 +76,11 @@ class AliasTest {
indexRequest = new CreateIndexRequest("test20160103"); indexRequest = new CreateIndexRequest("test20160103");
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[] { "test20160101", "test20160102", "test20160103" }; String[] indices = {
"test20160101",
"test20160102",
"test20160103"
};
String[] aliases = new String[] { alias }; String[] aliases = new String[] { alias };
IndicesAliasesRequest.AliasActions aliasAction = IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);

@ -29,7 +29,7 @@ class SearchTest {
@Test @Test
void testSearch() throws Exception { void testSearch() throws Exception {
ElasticsearchClient client = helper.client("1"); ElasticsearchClient client = helper.client();
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE); BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
IndexRequest indexRequest = new IndexRequest().index("pages").type("row") IndexRequest indexRequest = new IndexRequest().index("pages").type("row")

@ -33,7 +33,7 @@ class SimpleTest {
try { try {
DeleteIndexRequest deleteIndexRequest = DeleteIndexRequest deleteIndexRequest =
new DeleteIndexRequest().indices("test"); new DeleteIndexRequest().indices("test");
helper.client("1").execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); helper.client().execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
// ignore if index not found // ignore if index not found
} }
@ -44,22 +44,22 @@ class SimpleTest {
.build(); .build();
CreateIndexRequest createIndexRequest = new CreateIndexRequest(); CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index("test").settings(indexSettings); createIndexRequest.index("test").settings(indexSettings);
helper.client("1").execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); helper.client().execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
IndexRequest indexRequest = new IndexRequest(); IndexRequest indexRequest = new IndexRequest();
indexRequest.index("test").type("test").id("1") indexRequest.index("test").type("test").id("1")
.source(XContentFactory.jsonBuilder().startObject().field("field", .source(XContentFactory.jsonBuilder().startObject().field("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject()); "1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject());
helper.client("1").execute(IndexAction.INSTANCE, indexRequest).actionGet(); helper.client().execute(IndexAction.INSTANCE, indexRequest).actionGet();
RefreshRequest refreshRequest = new RefreshRequest(); RefreshRequest refreshRequest = new RefreshRequest();
refreshRequest.indices("test"); refreshRequest.indices("test");
helper.client("1").execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); helper.client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
SearchSourceBuilder builder = new SearchSourceBuilder(); SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery("field", builder.query(QueryBuilders.matchQuery("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8")); "1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8"));
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test").types("test"); searchRequest.indices("test").types("test");
searchRequest.source(builder); searchRequest.source(builder);
String doc = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet() String doc = helper.client().execute(SearchAction.INSTANCE, searchRequest).actionGet()
.getHits().getAt(0).getSourceAsString(); .getHits().getAt(0).getSourceAsString();
assertEquals(doc, assertEquals(doc,
"{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}"); "{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}");

@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -33,8 +32,6 @@ import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor; import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -72,9 +69,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Helper helper = extensionContext.getParent().get().getStore(ns) Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1"); helper.startNode();
try { try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet(); .timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
@ -86,7 +83,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse = ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
} }
@ -101,15 +98,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
private void closeNodes(Helper helper) { private void closeNodes(Helper helper) {
logger.info("closing all clients"); if (helper.node != null) {
for (AbstractClient client : helper.clients.values()) { helper.node.close();
client.close();
}
logger.info("closing all nodes");
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
} }
logger.info("all nodes closed"); logger.info("all nodes closed");
} }
@ -151,9 +141,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
int port; int port;
Map<String, Node> nodes = new HashMap<>(); Node node;
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
@ -178,10 +166,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.build(); .build();
} }
void startNode(String id) { void startNode() {
buildNode(id).start(); buildNode().start();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client(id). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress() Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress(); .publishAddress();
if (obj instanceof InetSocketTransportAddress) { if (obj instanceof InetSocketTransportAddress) {
@ -191,8 +179,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
} }
ElasticsearchClient client(String id) { ElasticsearchClient client() {
return clients.get(id); return node.client();
} }
String randomString(int len) { String randomString(int len) {
@ -204,16 +192,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return new String(buf); return new String(buf);
} }
private Node buildNode(String id) { private Node buildNode() {
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put(getNodeSettings())
.put("node.name", id) .put("node.name", "1")
.build(); .build();
Node node = new MockNode(nodeSettings); return new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
return node;
} }
} }
} }

@ -26,7 +26,7 @@ class WildcardTest {
@Test @Test
void testWildcard() throws Exception { void testWildcard() throws Exception {
ElasticsearchClient client = helper.client("1"); ElasticsearchClient client = helper.client();
index(client, "1", "010"); index(client, "1", "010");
index(client, "2", "0*0"); index(client, "2", "0*0");
// exact // exact

@ -15,7 +15,7 @@ public class NodeAdminClient extends AbstractAdminClient {
} }
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null); return helper.createClient(settings, null);
} }

@ -15,7 +15,7 @@ public class NodeBulkClient extends AbstractBulkClient {
} }
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null); return helper.createClient(settings, null);
} }

@ -36,10 +36,10 @@ public class NodeClientHelper {
key -> innerCreateClient(settings)); key -> innerCreateClient(settings));
} }
public void closeClient(Settings settings) { public void closeClient(Settings settings) throws IOException {
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
if (client != null) { if (client != null) {
logger.debug("closing node"); logger.debug("closing node...");
node.close(); node.close();
node = null; node = null;
} }

@ -15,7 +15,7 @@ public class NodeSearchClient extends AbstractSearchClient {
} }
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null); return helper.createClient(settings, null);
} }

@ -2,15 +2,11 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeBulkClientProvider;
@ -21,14 +17,13 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(TestExtension.class) @ExtendWith(TestExtension.class)
class BulkClientTest { class BulkClientTest {
private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName()); private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName());
private static final Long ACTIONS = 10000L; private static final Long ACTIONS = 100000L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -37,66 +32,38 @@ class BulkClientTest {
} }
@Test @Test
void testSingleDoc() throws Exception { void testNewIndex() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
assertNull(bulkClient.getBulkController().getLastBulkError());
} }
} }
@Test @Test
void testNewIndex() throws Exception { void testSingleDoc() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
} bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
} bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
@Test if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
void testMapping() throws Exception { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) }
.setAdminClientProvider(NodeAdminClientProvider.class) assertNull(bulkClient.getBulkProcessor().getLastBulkError());
.put(helper.getNodeSettings())
.build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
} }
} }
@Test @Test
void testRandomDocs() throws Exception { void testRandomDocs() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
@ -109,11 +76,11 @@ class BulkClientTest {
} }
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
} }
@ -124,7 +91,7 @@ class BulkClientTest {
int maxthreads = Runtime.getRuntime().availableProcessors(); int maxthreads = Runtime.getRuntime().availableProcessors();
final long actions = ACTIONS; final long actions = ACTIONS;
long timeout = 120L; long timeout = 120L;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
@ -150,13 +117,13 @@ class BulkClientTest {
logger.error("latch timeout!"); logger.error("latch timeout!");
} }
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
if (bulkClient.getBulkController().getLastBulkError() != null) { assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
logger.error("error", bulkClient.getBulkController().getLastBulkError()); if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -35,7 +35,7 @@ class DuplicateIDTest {
@Test @Test
void testDuplicateDocIDs() throws Exception { void testDuplicateDocIDs() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
@ -46,15 +46,14 @@ class DuplicateIDTest {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -14,6 +14,7 @@ import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeBulkClientProvider;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -37,15 +38,16 @@ class IndexPruneTest {
@Test @Test
void testPrune() throws IOException { void testPrune() throws IOException {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test_prune", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune1"); indexDefinition.setFullIndexName("test_prune1");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
@ -65,8 +67,6 @@ class IndexPruneTest {
IndexRetention indexRetention = new DefaultIndexRetention(); IndexRetention indexRetention = new DefaultIndexRetention();
indexRetention.setDelta(2); indexRetention.setDelta(2);
indexRetention.setMinToKeep(2); indexRetention.setMinToKeep(2);
indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune4");
indexDefinition.setRetention(indexRetention); indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
@ -87,10 +87,10 @@ class IndexPruneTest {
assertFalse(list.get(1)); assertFalse(list.get(1));
assertTrue(list.get(2)); assertTrue(list.get(2));
assertTrue(list.get(3)); assertTrue(list.get(3));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -36,11 +36,11 @@ class IndexShiftTest {
@Test @Test
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
@ -51,8 +51,9 @@ class IndexShiftTest {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setIndex("test");
indexDefinition.setFullIndexName("test_shift");
indexDefinition.setShift(true); indexDefinition.setShift(true);
IndexShiftResult indexShiftResult = IndexShiftResult indexShiftResult =
adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null);
@ -60,27 +61,26 @@ class IndexShiftTest {
assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("b"));
assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getNewAliases().contains("c"));
assertTrue(indexShiftResult.getMovedAliases().isEmpty()); assertTrue(indexShiftResult.getMovedAliases().isEmpty());
Map<String, String> aliases = adminClient.getAliases("test_shift"); Map<String, String> aliases = adminClient.getAliases(indexDefinition.getFullIndexName());
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test")); assertTrue(aliases.containsKey(indexDefinition.getIndex()));
Optional<String> resolved = adminClient.resolveAlias("test").stream().findFirst(); Optional<String> resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ? aliases = resolved.isPresent() ?
adminClient.getAliases(resolved.get()) : Collections.emptyMap(); adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test")); assertTrue(aliases.containsKey(indexDefinition.getIndex()));
indexDefinition.setFullIndexName("test_shift2"); indexDefinition.setFullIndexName("test_shift2");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setFullIndexName("test_shift2"); indexDefinition.setShift(true);
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
index, alias).filter(QueryBuilders.termQuery("my_key", alias))) index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
@ -91,7 +91,7 @@ class IndexShiftTest {
assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("a"));
assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("b"));
assertTrue(indexShiftResult.getMovedAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().contains("c"));
aliases = adminClient.getAliases("test_shift2"); aliases = adminClient.getAliases(indexDefinition.getFullIndexName());
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
@ -106,10 +106,10 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f")); assertTrue(aliases.containsKey("f"));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -13,7 +13,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeBulkClientProvider;
import org.xbib.elx.node.NodeSearchClient; import org.xbib.elx.node.NodeSearchClient;
@ -29,8 +28,6 @@ class SearchTest {
private static final Long ACTIONS = 100000L; private static final Long ACTIONS = 100000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
SearchTest(TestExtension.Helper helper) { SearchTest(TestExtension.Helper helper) {
@ -41,10 +38,9 @@ class SearchTest {
void testDocStream() throws Exception { void testDocStream() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -56,13 +52,13 @@ class SearchTest {
assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS));
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client) try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client())
.setSearchClientProvider(NodeSearchClientProvider.class) .setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
@ -70,7 +66,7 @@ class SearchTest {
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMillis(100), 570); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions, count); assertEquals(numactions, count);
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
@ -92,13 +88,13 @@ class SearchTest {
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery())); .setQuery(QueryBuilders.matchAllQuery()));
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> { ids.forEach(id -> idcount.incrementAndGet());
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get()); assertEquals(numactions, idcount.get());
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
} }
} }
} }

@ -3,6 +3,8 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
@ -32,13 +34,13 @@ class SmokeTest {
@Test @Test
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getNodeSettings())
.build()) { .build()) {
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
@ -54,28 +56,37 @@ class SmokeTest {
adminClient.checkMapping(indexDefinition); adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition); int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica); assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
logger.info("done"); XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
} }
} }
} }

@ -12,7 +12,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -121,8 +121,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Node node; Node node;
AbstractClient client;
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
} }
@ -150,7 +148,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
void startNode() { void startNode() {
buildNode().start(); buildNode().start();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress().publishAddress(); Object obj = response.iterator().next().getTransport().getAddress().publishAddress();
if (obj instanceof InetSocketTransportAddress) { if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj; InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
@ -175,16 +173,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings()) .put(getNodeSettings())
.put("node.name", id) .put("node.name", id)
.build(); .build();
node = new MockNode(nodeSettings); this.node = new MockNode(nodeSettings);
client = (AbstractClient) node.client();
return node; return node;
} }
ElasticsearchClient client() {
return node.client();
}
void closeNodes() { void closeNodes() {
if (client != null) {
logger.info("closing client");
client.close();
}
if (node != null) { if (node != null) {
logger.info("closing all nodes"); logger.info("closing all nodes");
node.close(); node.close();
@ -193,7 +190,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
void greenHealth() throws IOException { void greenHealth() throws IOException {
try { try {
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet(); .timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
@ -208,7 +205,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
String clusterName() { String clusterName() {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse = ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value(); return clusterStateResponse.getClusterName().value();
} }

@ -16,9 +16,6 @@ import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
/**
*
*/
public class NetworkUtils { public class NetworkUtils {
private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName()); private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName());

@ -31,7 +31,7 @@ public class TransportAdminClient extends AbstractAdminClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

@ -30,7 +30,7 @@ public class TransportBulkClient extends AbstractBulkClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

@ -30,7 +30,7 @@ public class TransportSearchClient extends AbstractSearchClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

@ -2,15 +2,11 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportBulkClientProvider;
@ -21,7 +17,6 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(TestExtension.class) @ExtendWith(TestExtension.class)
class BulkClientTest { class BulkClientTest {
@ -37,59 +32,31 @@ class BulkClientTest {
} }
@Test @Test
void testSingleDoc() throws Exception { void testNewIndex() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
assertNull(bulkClient.getBulkController().getLastBulkError());
} }
} }
@Test @Test
void testNewIndex() throws Exception { void testSingleDoc() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
} bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
} bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
@Test if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
void testMapping() throws Exception { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
try (TransportAdminClient adminClient = ClientBuilder.builder() }
.setAdminClientProvider(TransportAdminClientProvider.class) assertNull(bulkClient.getBulkProcessor().getLastBulkError());
.put(helper.getTransportSettings())
.build();
TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings())
.build()) {
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
} }
} }
@ -106,24 +73,22 @@ class BulkClientTest {
bulkClient.index(indexDefinition, null, false, bulkClient.index(indexDefinition, null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
} }
} }
@Test @Test
void testThreadedRandomDocs() { void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors(); final int maxthreads = Runtime.getRuntime().availableProcessors();
//long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
final long actions = ACTIONS; final long actions = ACTIONS;
long timeout = 120L; final long timeout = 120L;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
@ -153,15 +118,13 @@ class BulkClientTest {
logger.error("latch timeout!"); logger.error("latch timeout!");
} }
bulkClient.stopBulk(indexDefinition); bulkClient.stopBulk(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} catch (Exception e) {
logger.error(e.getMessage(), e);
} }
} }
} }

@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportBulkClientProvider;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -22,8 +21,6 @@ class DuplicateIDTest {
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 100L;
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
DuplicateIDTest(TestExtension.Helper helper) { DuplicateIDTest(TestExtension.Helper helper) {
@ -36,7 +33,6 @@ class DuplicateIDTest {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -44,15 +40,14 @@ class DuplicateIDTest {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -47,6 +47,7 @@ class IndexPruneTest {
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune1"); indexDefinition.setFullIndexName("test_prune1");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
@ -64,10 +65,6 @@ class IndexPruneTest {
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
IndexRetention indexRetention = new DefaultIndexRetention(); IndexRetention indexRetention = new DefaultIndexRetention();
indexRetention.setDelta(2);
indexRetention.setMinToKeep(2);
indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune4");
indexDefinition.setRetention(indexRetention); indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
@ -88,10 +85,10 @@ class IndexPruneTest {
assertFalse(list.get(1)); assertFalse(list.get(1));
assertTrue(list.get(2)); assertTrue(list.get(2));
assertTrue(list.get(3)); assertTrue(list.get(3));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -53,7 +53,6 @@ class IndexShiftTest {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setShift(true); indexDefinition.setShift(true);
IndexShiftResult indexShiftResult = IndexShiftResult indexShiftResult =
@ -62,27 +61,26 @@ class IndexShiftTest {
assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("b"));
assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getNewAliases().contains("c"));
assertTrue(indexShiftResult.getMovedAliases().isEmpty()); assertTrue(indexShiftResult.getMovedAliases().isEmpty());
Map<String, String> aliases = adminClient.getAliases("test_shift"); Map<String, String> aliases = adminClient.getAliases(indexDefinition.getFullIndexName());
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test")); assertTrue(aliases.containsKey(indexDefinition.getIndex()));
Optional<String> resolved = adminClient.resolveAlias("test").stream().findFirst(); Optional<String> resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ? aliases = resolved.isPresent() ?
adminClient.getAliases(resolved.get()) : Collections.emptyMap(); adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test")); assertTrue(aliases.containsKey(indexDefinition.getIndex()));
indexDefinition.setFullIndexName("test_shift2"); indexDefinition.setFullIndexName("test_shift2");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
bulkClient.index(indexDefinition, helper.randomString(1), false, bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}"); "{ \"name\" : \"" + helper.randomString(32) + "\"}");
} }
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setFullIndexName("test_shift2"); indexDefinition.setShift(true);
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
index, alias).filter(QueryBuilders.termQuery("my_key", alias))) index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
@ -93,14 +91,14 @@ class IndexShiftTest {
assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("a"));
assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("b"));
assertTrue(indexShiftResult.getMovedAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().contains("c"));
aliases = adminClient.getAliases("test_shift2"); aliases = adminClient.getAliases(indexDefinition.getFullIndexName());
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f")); assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test").stream().findFirst(); resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("b"));
@ -108,10 +106,10 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f")); assertTrue(aliases.containsKey("f"));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
} }
} }

@ -10,7 +10,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportBulkClientProvider;
import org.xbib.elx.transport.TransportSearchClient; import org.xbib.elx.transport.TransportSearchClient;
@ -31,8 +30,6 @@ class SearchTest {
private static final Long ACTIONS = 100000L; private static final Long ACTIONS = 100000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
SearchTest(TestExtension.Helper helper) { SearchTest(TestExtension.Helper helper) {
@ -46,7 +43,6 @@ class SearchTest {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -58,11 +54,11 @@ class SearchTest {
assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS));
bulkClient.refreshIndex(indexDefinition); bulkClient.refreshIndex(indexDefinition);
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
} }
try (TransportSearchClient searchClient = ClientBuilder.builder() try (TransportSearchClient searchClient = ClientBuilder.builder()
.setSearchClientProvider(TransportSearchClientProvider.class) .setSearchClientProvider(TransportSearchClientProvider.class)
@ -72,7 +68,7 @@ class SearchTest {
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMillis(100), 570); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions, count); assertEquals(numactions, count);
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
@ -94,13 +90,13 @@ class SearchTest {
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery())); .setQuery(QueryBuilders.matchAllQuery()));
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> { ids.forEach(id -> idcount.incrementAndGet());
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get()); assertEquals(numactions, idcount.get());
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
} }
} }
} }

@ -3,6 +3,8 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
@ -41,9 +43,9 @@ class SmokeTest {
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY);
assertEquals("test_smoke", indexDefinition.getIndex()); assertEquals("test", indexDefinition.getIndex());
assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertTrue(indexDefinition.getFullIndexName().startsWith("test"));
assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -63,13 +65,26 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition); int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica); assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkProcessor().getLastBulkError());
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
} }
} }
} }

Loading…
Cancel
Save