fix index shift/prune, add failure counter in search metrics
This commit is contained in:
parent
33baa0b683
commit
83b0c51ac9
16 changed files with 168 additions and 106 deletions
|
@ -123,7 +123,8 @@ public interface AdminClient extends BasicClient {
|
||||||
* @param additionalAliases new aliases
|
* @param additionalAliases new aliases
|
||||||
* @return this
|
* @return this
|
||||||
*/
|
*/
|
||||||
IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases);
|
IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
|
||||||
|
List<String> additionalAliases);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shift from one index to another.
|
* Shift from one index to another.
|
||||||
|
@ -137,34 +138,9 @@ public interface AdminClient extends BasicClient {
|
||||||
IndexAliasAdder indexAliasAdder);
|
IndexAliasAdder indexAliasAdder);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shift from one index to another.
|
* Prune index.
|
||||||
* @param index the index name
|
|
||||||
* @param fullIndexName the index name with timestamp
|
|
||||||
* @param additionalAliases a list of names that should be set as index aliases
|
|
||||||
* @return this
|
|
||||||
*/
|
|
||||||
IndexShiftResult shiftIndex(String index,
|
|
||||||
String fullIndexName,
|
|
||||||
List<String> additionalAliases);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Shift from one index to another.
|
|
||||||
* @param index the index name
|
|
||||||
* @param fullIndexName the index name with timestamp
|
|
||||||
* @param additionalAliases a list of names that should be set as index aliases
|
|
||||||
* @param adder an adder method to create alias term queries
|
|
||||||
* @return this
|
|
||||||
*/
|
|
||||||
IndexShiftResult shiftIndex(String index,
|
|
||||||
String fullIndexName, List<String> additionalAliases,
|
|
||||||
IndexAliasAdder adder);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Apply retention policy to prune indices. All indices before delta should be deleted,
|
|
||||||
* but the number of mintokeep indices must be kept.
|
|
||||||
*
|
*
|
||||||
* @param indexDefinition index definition
|
* @param indexDefinition the index definition
|
||||||
* @return the index prune result
|
* @return the index prune result
|
||||||
*/
|
*/
|
||||||
IndexPruneResult pruneIndex(IndexDefinition indexDefinition);
|
IndexPruneResult pruneIndex(IndexDefinition indexDefinition);
|
||||||
|
|
|
@ -164,7 +164,6 @@ public interface BulkClient extends BasicClient, Flushable {
|
||||||
void startBulk(String index, long startRefreshIntervalSeconds,
|
void startBulk(String index, long startRefreshIntervalSeconds,
|
||||||
long stopRefreshIntervalSeconds) throws IOException;
|
long stopRefreshIntervalSeconds) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop bulk mode.
|
* Stop bulk mode.
|
||||||
*
|
*
|
||||||
|
@ -216,5 +215,4 @@ public interface BulkClient extends BasicClient, Flushable {
|
||||||
* @param index index
|
* @param index index
|
||||||
*/
|
*/
|
||||||
void flushIndex(String index);
|
void flushIndex(String index);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,9 @@ package org.xbib.elx.api;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A bulk listener for following executions of bulk operations.
|
||||||
|
*/
|
||||||
public interface BulkListener {
|
public interface BulkListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -26,7 +26,7 @@ public interface IndexDefinition {
|
||||||
|
|
||||||
DateTimeFormatter getDateTimeFormatter();
|
DateTimeFormatter getDateTimeFormatter();
|
||||||
|
|
||||||
IndexDefinition setDateTimePattern(Pattern timeWindow);
|
IndexDefinition setDateTimePattern(Pattern pattern);
|
||||||
|
|
||||||
Pattern getDateTimePattern();
|
Pattern getDateTimePattern();
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,10 @@ public interface SearchMetric extends Closeable {
|
||||||
|
|
||||||
Count getEmptyQueries();
|
Count getEmptyQueries();
|
||||||
|
|
||||||
|
Count getFailedQueries();
|
||||||
|
|
||||||
|
Count getTimeoutQueries();
|
||||||
|
|
||||||
long elapsed();
|
long elapsed();
|
||||||
|
|
||||||
void start();
|
void start();
|
||||||
|
|
|
@ -100,19 +100,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
*/
|
*/
|
||||||
private static final String TYPE_NAME = "doc";
|
private static final String TYPE_NAME = "doc";
|
||||||
|
|
||||||
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
|
|
||||||
@Override
|
|
||||||
public List<String> getMovedAliases() {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<String> getNewAliases() {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ?> getMapping(String index) throws IOException {
|
public Map<String, ?> getMapping(String index) throws IOException {
|
||||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||||
|
@ -138,7 +125,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
||||||
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
||||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||||
waitForShards(30L, TimeUnit.SECONDS);
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +141,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
||||||
waitForShards(maxWaitTime, timeUnit);
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,6 +193,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> resolveAlias(String alias) {
|
public List<String> resolveAlias(String alias) {
|
||||||
|
if (alias == null) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||||
clusterStateRequest.blocks(false);
|
clusterStateRequest.blocks(false);
|
||||||
|
@ -223,15 +211,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases) {
|
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
|
||||||
|
List<String> additionalAliases) {
|
||||||
return shiftIndex(indexDefinition, additionalAliases, null);
|
return shiftIndex(indexDefinition, additionalAliases, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
|
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
|
||||||
List<String> additionalAliases, IndexAliasAdder indexAliasAdder) {
|
List<String> additionalAliases,
|
||||||
|
IndexAliasAdder indexAliasAdder) {
|
||||||
if (additionalAliases == null) {
|
if (additionalAliases == null) {
|
||||||
return EMPTY_INDEX_SHIFT_RESULT;
|
return new EmptyIndexShiftResult();
|
||||||
}
|
}
|
||||||
if (indexDefinition.isShiftEnabled()) {
|
if (indexDefinition.isShiftEnabled()) {
|
||||||
return shiftIndex(indexDefinition.getIndex(),
|
return shiftIndex(indexDefinition.getIndex(),
|
||||||
|
@ -239,30 +229,24 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
.filter(a -> a != null && !a.isEmpty())
|
.filter(a -> a != null && !a.isEmpty())
|
||||||
.collect(Collectors.toList()), indexAliasAdder);
|
.collect(Collectors.toList()), indexAliasAdder);
|
||||||
}
|
}
|
||||||
return EMPTY_INDEX_SHIFT_RESULT;
|
return new EmptyIndexShiftResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private IndexShiftResult shiftIndex(String index, String fullIndexName,
|
||||||
public IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases) {
|
|
||||||
return shiftIndex(index, fullIndexName, additionalAliases, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public IndexShiftResult shiftIndex(String index, String fullIndexName,
|
|
||||||
List<String> additionalAliases,
|
List<String> additionalAliases,
|
||||||
IndexAliasAdder adder) {
|
IndexAliasAdder adder) {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
if (index == null) {
|
if (index == null) {
|
||||||
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
|
return new EmptyIndexShiftResult(); // nothing to shift to
|
||||||
}
|
}
|
||||||
if (index.equals(fullIndexName)) {
|
if (index.equals(fullIndexName)) {
|
||||||
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
|
return new EmptyIndexShiftResult(); // nothing to shift to
|
||||||
}
|
}
|
||||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||||
// two situations: 1. a new alias 2. there is already an old index with the alias
|
// two situations: 1. a new alias 2. there is already an old index with the alias
|
||||||
Optional<String> oldIndex = resolveAlias(index).stream().sorted().findFirst();
|
Optional<String> oldIndex = resolveAlias(index).stream().sorted().findFirst();
|
||||||
Map<String, String> oldAliasMap = oldIndex.map(this::getAliases).orElse(null);
|
Map<String, String> oldAliasMap = oldIndex.map(this::getAliases).orElse(null);
|
||||||
logger.debug("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap);
|
logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap);
|
||||||
final List<String> newAliases = new ArrayList<>();
|
final List<String> newAliases = new ArrayList<>();
|
||||||
final List<String> moveAliases = new ArrayList<>();
|
final List<String> moveAliases = new ArrayList<>();
|
||||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||||
|
@ -670,6 +654,20 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class EmptyIndexShiftResult implements IndexShiftResult {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getMovedAliases() {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getNewAliases() {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class SuccessPruneResult implements IndexPruneResult {
|
private static class SuccessPruneResult implements IndexPruneResult {
|
||||||
|
|
||||||
Collection<String> candidateIndices;
|
Collection<String> candidateIndices;
|
||||||
|
|
|
@ -127,7 +127,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
queryBuilder.accept(searchRequestBuilder);
|
queryBuilder.accept(searchRequestBuilder);
|
||||||
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
|
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
|
||||||
SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet();
|
SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet();
|
||||||
Stream<SearchResponse> infiniteResponses = Stream.iterate(originalSearchResponse,
|
Stream<SearchResponse> responseStream = Stream.iterate(originalSearchResponse,
|
||||||
searchResponse -> {
|
searchResponse -> {
|
||||||
SearchScrollRequestBuilder searchScrollRequestBuilder =
|
SearchScrollRequestBuilder searchScrollRequestBuilder =
|
||||||
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
|
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
|
||||||
|
@ -139,8 +139,11 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
searchMetric.getCurrentQueries().dec();
|
searchMetric.getCurrentQueries().dec();
|
||||||
searchMetric.getQueries().inc();
|
searchMetric.getQueries().inc();
|
||||||
searchMetric.markTotalQueries(1);
|
searchMetric.markTotalQueries(1);
|
||||||
boolean isempty = searchResponse1.getHits().getHits().length == 0;
|
if (searchResponse1.getFailedShards() > 0) {
|
||||||
if (isempty) {
|
searchMetric.getFailedQueries().inc();
|
||||||
|
} else if (searchResponse1.isTimedOut()) {
|
||||||
|
searchMetric.getTimeoutQueries().inc();
|
||||||
|
} else if (searchResponse1.getHits().getHits().length == 0) {
|
||||||
searchMetric.getEmptyQueries().inc();
|
searchMetric.getEmptyQueries().inc();
|
||||||
} else {
|
} else {
|
||||||
searchMetric.getSucceededQueries().inc();
|
searchMetric.getSucceededQueries().inc();
|
||||||
|
@ -154,9 +157,9 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
.addScrollId(searchResponse.getScrollId());
|
.addScrollId(searchResponse.getScrollId());
|
||||||
clearScrollRequestBuilder.execute().actionGet();
|
clearScrollRequestBuilder.execute().actionGet();
|
||||||
};
|
};
|
||||||
return StreamSupport.stream(TakeWhileSpliterator.over(infiniteResponses.spliterator(),
|
return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(),
|
||||||
condition, lastAction), false)
|
condition, lastAction), false)
|
||||||
.onClose(infiniteResponses::close)
|
.onClose(responseStream::close)
|
||||||
.flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
|
.flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +168,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId);
|
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TakeWhileSpliterator<T> implements Spliterator<T> {
|
private static class TakeWhileSpliterator<T> implements Spliterator<T> {
|
||||||
|
|
||||||
private final Spliterator<T> source;
|
private final Spliterator<T> source;
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import org.xbib.elx.api.IndexDefinition;
|
||||||
import org.xbib.elx.api.IndexRetention;
|
import org.xbib.elx.api.IndexRetention;
|
||||||
|
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -43,6 +44,11 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
||||||
|
|
||||||
private long stopRefreshInterval;
|
private long stopRefreshInterval;
|
||||||
|
|
||||||
|
public DefaultIndexDefinition() {
|
||||||
|
setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
|
||||||
|
setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexDefinition setIndex(String index) {
|
public IndexDefinition setIndex(String index) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
|
|
|
@ -24,6 +24,10 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
|
|
||||||
private final Count emptyQueries;
|
private final Count emptyQueries;
|
||||||
|
|
||||||
|
private final Count failedQueries;
|
||||||
|
|
||||||
|
private final Count timeoutQueries;
|
||||||
|
|
||||||
private Long started;
|
private Long started;
|
||||||
|
|
||||||
private Long stopped;
|
private Long stopped;
|
||||||
|
@ -34,6 +38,8 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
queries = new CountMetric();
|
queries = new CountMetric();
|
||||||
succeededQueries = new CountMetric();
|
succeededQueries = new CountMetric();
|
||||||
emptyQueries = new CountMetric();
|
emptyQueries = new CountMetric();
|
||||||
|
failedQueries = new CountMetric();
|
||||||
|
timeoutQueries = new CountMetric();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -72,6 +78,16 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
return emptyQueries;
|
return emptyQueries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Count getFailedQueries() {
|
||||||
|
return failedQueries;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Count getTimeoutQueries() {
|
||||||
|
return timeoutQueries;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long elapsed() {
|
public long elapsed() {
|
||||||
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
|
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
|
||||||
|
|
|
@ -16,13 +16,10 @@ import org.xbib.elx.node.NodeAdminClientProvider;
|
||||||
import org.xbib.elx.node.NodeBulkClient;
|
import org.xbib.elx.node.NodeBulkClient;
|
||||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
@ -54,21 +51,28 @@ class IndexPruneTest {
|
||||||
.put("index.number_of_replicas", 0)
|
.put("index.number_of_replicas", 0)
|
||||||
.build();
|
.build();
|
||||||
bulkClient.newIndex("test_prune1", settings);
|
bulkClient.newIndex("test_prune1", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune1", Collections.emptyList());
|
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
||||||
|
indexDefinition.setIndex("test_prune");
|
||||||
|
indexDefinition.setFullIndexName("test_prune1");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune2", settings);
|
bulkClient.newIndex("test_prune2", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune2", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune2");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune3", settings);
|
bulkClient.newIndex("test_prune3", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune3", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune3");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune4", settings);
|
bulkClient.newIndex("test_prune4", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune4", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune4");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
IndexRetention indexRetention = new DefaultIndexRetention();
|
IndexRetention indexRetention = new DefaultIndexRetention();
|
||||||
indexRetention.setDelta(2);
|
indexRetention.setDelta(2);
|
||||||
indexRetention.setMinToKeep(2);
|
indexRetention.setMinToKeep(2);
|
||||||
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
|
||||||
indexDefinition.setIndex("test_prune");
|
indexDefinition.setIndex("test_prune");
|
||||||
indexDefinition.setFullIndexName("test_prune4");
|
indexDefinition.setFullIndexName("test_prune4");
|
||||||
indexDefinition.setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
|
|
||||||
indexDefinition.setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
|
|
||||||
indexDefinition.setRetention(indexRetention);
|
indexDefinition.setRetention(indexRetention);
|
||||||
indexDefinition.setEnabled(true);
|
indexDefinition.setEnabled(true);
|
||||||
indexDefinition.setPrune(true);
|
indexDefinition.setPrune(true);
|
||||||
|
|
|
@ -8,8 +8,10 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
import org.xbib.elx.api.IndexShiftResult;
|
import org.xbib.elx.api.IndexShiftResult;
|
||||||
import org.xbib.elx.common.ClientBuilder;
|
import org.xbib.elx.common.ClientBuilder;
|
||||||
|
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||||
import org.xbib.elx.node.NodeAdminClient;
|
import org.xbib.elx.node.NodeAdminClient;
|
||||||
import org.xbib.elx.node.NodeAdminClientProvider;
|
import org.xbib.elx.node.NodeAdminClientProvider;
|
||||||
import org.xbib.elx.node.NodeBulkClient;
|
import org.xbib.elx.node.NodeBulkClient;
|
||||||
|
@ -54,8 +56,11 @@ class IndexShiftTest {
|
||||||
}
|
}
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
IndexShiftResult indexShiftResult =
|
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
||||||
adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c"));
|
indexDefinition.setIndex("test");
|
||||||
|
indexDefinition.setFullIndexName("test_shift");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
||||||
|
@ -78,7 +83,8 @@ class IndexShiftTest {
|
||||||
}
|
}
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"),
|
indexDefinition.setFullIndexName("test_shift2");
|
||||||
|
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
|
||||||
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||||
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||||
);
|
);
|
||||||
|
|
|
@ -24,9 +24,9 @@ class SearchTest {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||||
|
|
||||||
private static final Long ACTIONS = 100L;
|
private static final Long ACTIONS = 100000L;
|
||||||
|
|
||||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||||
|
|
||||||
private final TestExtension.Helper helper;
|
private final TestExtension.Helper helper;
|
||||||
|
|
||||||
|
@ -36,21 +36,22 @@ class SearchTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testDocStream() throws Exception {
|
void testDocStream() throws Exception {
|
||||||
|
long numactions = ACTIONS;
|
||||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||||
.put(helper.getNodeSettings())
|
.put(helper.getNodeSettings())
|
||||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||||
.build()) {
|
.build()) {
|
||||||
bulkClient.newIndex("test");
|
bulkClient.newIndex("test");
|
||||||
for (int i = 0; i < ACTIONS; i++) {
|
for (int i = 0; i < numactions; i++) {
|
||||||
bulkClient.index("test", null, false,
|
bulkClient.index("test", null, false,
|
||||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||||
}
|
}
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
bulkClient.refreshIndex("test");
|
bulkClient.refreshIndex("test");
|
||||||
assertEquals(ACTIONS, bulkClient.getSearchableDocs("test"));
|
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||||
assertEquals(ACTIONS, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||||
}
|
}
|
||||||
|
@ -60,24 +61,39 @@ class SearchTest {
|
||||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||||
.put(helper.getNodeSettings())
|
.put(helper.getNodeSettings())
|
||||||
.build()) {
|
.build()) {
|
||||||
|
// test stream count
|
||||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()),
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
TimeValue.timeValueMinutes(1), 10);
|
TimeValue.timeValueMillis(100), 570);
|
||||||
long count = stream.count();
|
long count = stream.count();
|
||||||
assertEquals(ACTIONS, count);
|
assertEquals(numactions, count);
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
|
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
|
// test stream docs
|
||||||
|
stream = searchClient.search(qb -> qb
|
||||||
|
.setIndices("test")
|
||||||
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
|
TimeValue.timeValueMillis(10), 79);
|
||||||
|
final AtomicInteger hitcount = new AtomicInteger();
|
||||||
|
stream.forEach(hit -> hitcount.incrementAndGet());
|
||||||
|
assertEquals(numactions, hitcount.get());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
|
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
|
// test stream doc ids
|
||||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()));
|
.setQuery(QueryBuilders.matchAllQuery()));
|
||||||
final AtomicInteger idcount = new AtomicInteger();
|
final AtomicInteger idcount = new AtomicInteger();
|
||||||
ids.forEach(id -> {
|
ids.forEach(id -> {
|
||||||
logger.info(id);
|
|
||||||
idcount.incrementAndGet();
|
idcount.incrementAndGet();
|
||||||
});
|
});
|
||||||
assertEquals(ACTIONS, idcount.get());
|
assertEquals(numactions, idcount.get());
|
||||||
assertEquals(11, searchClient.getSearchMetric().getQueries().getCount());
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
assertEquals(9, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,21 +55,28 @@ class IndexPruneTest {
|
||||||
.put("index.number_of_replicas", 0)
|
.put("index.number_of_replicas", 0)
|
||||||
.build();
|
.build();
|
||||||
bulkClient.newIndex("test_prune1", settings);
|
bulkClient.newIndex("test_prune1", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune1", Collections.emptyList());
|
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
||||||
|
indexDefinition.setIndex("test_prune");
|
||||||
|
indexDefinition.setFullIndexName("test_prune1");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune2", settings);
|
bulkClient.newIndex("test_prune2", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune2", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune2");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune3", settings);
|
bulkClient.newIndex("test_prune3", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune3", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune3");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
bulkClient.newIndex("test_prune4", settings);
|
bulkClient.newIndex("test_prune4", settings);
|
||||||
adminClient.shiftIndex("test_prune", "test_prune4", Collections.emptyList());
|
indexDefinition.setFullIndexName("test_prune4");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
|
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
|
||||||
IndexRetention indexRetention = new DefaultIndexRetention();
|
IndexRetention indexRetention = new DefaultIndexRetention();
|
||||||
indexRetention.setDelta(2);
|
indexRetention.setDelta(2);
|
||||||
indexRetention.setMinToKeep(2);
|
indexRetention.setMinToKeep(2);
|
||||||
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
|
||||||
indexDefinition.setIndex("test_prune");
|
indexDefinition.setIndex("test_prune");
|
||||||
indexDefinition.setFullIndexName("test_prune4");
|
indexDefinition.setFullIndexName("test_prune4");
|
||||||
indexDefinition.setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
|
|
||||||
indexDefinition.setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
|
|
||||||
indexDefinition.setRetention(indexRetention);
|
indexDefinition.setRetention(indexRetention);
|
||||||
indexDefinition.setEnabled(true);
|
indexDefinition.setEnabled(true);
|
||||||
indexDefinition.setPrune(true);
|
indexDefinition.setPrune(true);
|
||||||
|
|
|
@ -8,8 +8,10 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
import org.xbib.elx.api.IndexShiftResult;
|
import org.xbib.elx.api.IndexShiftResult;
|
||||||
import org.xbib.elx.common.ClientBuilder;
|
import org.xbib.elx.common.ClientBuilder;
|
||||||
|
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||||
import org.xbib.elx.transport.TransportAdminClient;
|
import org.xbib.elx.transport.TransportAdminClient;
|
||||||
import org.xbib.elx.transport.TransportAdminClientProvider;
|
import org.xbib.elx.transport.TransportAdminClientProvider;
|
||||||
import org.xbib.elx.transport.TransportBulkClient;
|
import org.xbib.elx.transport.TransportBulkClient;
|
||||||
|
@ -56,8 +58,12 @@ class IndexShiftTest {
|
||||||
}
|
}
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
|
IndexDefinition indexDefinition = new DefaultIndexDefinition();
|
||||||
|
indexDefinition.setIndex("test");
|
||||||
|
indexDefinition.setFullIndexName("test_shift");
|
||||||
|
indexDefinition.setShift(true);
|
||||||
IndexShiftResult indexShiftResult =
|
IndexShiftResult indexShiftResult =
|
||||||
adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c"));
|
adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
||||||
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
||||||
|
@ -80,7 +86,8 @@ class IndexShiftTest {
|
||||||
}
|
}
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"),
|
indexDefinition.setFullIndexName("test_shift2");
|
||||||
|
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
|
||||||
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||||
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||||
);
|
);
|
||||||
|
|
|
@ -26,9 +26,9 @@ class SearchTest {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||||
|
|
||||||
private static final Long ACTIONS = 100L;
|
private static final Long ACTIONS = 100000L;
|
||||||
|
|
||||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||||
|
|
||||||
private final TestExtension.Helper helper;
|
private final TestExtension.Helper helper;
|
||||||
|
|
||||||
|
@ -63,21 +63,39 @@ class SearchTest {
|
||||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||||
.put(helper.getTransportSettings())
|
.put(helper.getTransportSettings())
|
||||||
.build()) {
|
.build()) {
|
||||||
|
// test stream count
|
||||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()),
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
TimeValue.timeValueMinutes(1), 10);
|
TimeValue.timeValueMillis(100), 570);
|
||||||
long count = stream.count();
|
long count = stream.count();
|
||||||
assertEquals(numactions, count);
|
assertEquals(numactions, count);
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
|
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
|
// test stream docs
|
||||||
|
stream = searchClient.search(qb -> qb
|
||||||
|
.setIndices("test")
|
||||||
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
|
TimeValue.timeValueMillis(10), 79);
|
||||||
|
final AtomicInteger hitcount = new AtomicInteger();
|
||||||
|
stream.forEach(hit -> hitcount.incrementAndGet());
|
||||||
|
assertEquals(numactions, hitcount.get());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
|
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
|
// test stream doc ids
|
||||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()));
|
.setQuery(QueryBuilders.matchAllQuery()));
|
||||||
final AtomicInteger idcount = new AtomicInteger();
|
final AtomicInteger idcount = new AtomicInteger();
|
||||||
ids.forEach(id -> {
|
ids.forEach(id -> {
|
||||||
logger.info(id);
|
|
||||||
idcount.incrementAndGet();
|
idcount.incrementAndGet();
|
||||||
});
|
});
|
||||||
assertEquals(numactions, idcount.get());
|
assertEquals(numactions, idcount.get());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||||
|
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||||
|
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 2.2.1.28
|
version = 2.2.1.29
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
|
Loading…
Reference in a new issue