update search API, now with wrapper API, add multi get tests, add HTTP MultiGet action
This commit is contained in:
parent
f2e4d27750
commit
2701c2b11d
19 changed files with 206 additions and 71 deletions
|
@ -1,9 +1,10 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface IndexAliasAdder {
|
||||
|
||||
void addIndexAlias(IndicesAliasesRequest request, String index, String alias);
|
||||
QueryBuilder addAliasOnField(String index, String alias);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,13 +1,9 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetRequestBuilder;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -18,13 +14,13 @@ public interface SearchClient extends BasicClient {
|
|||
|
||||
SearchMetric getSearchMetric();
|
||||
|
||||
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder);
|
||||
Optional<SearchDocument> get(Consumer<GetRequestBuilder> getRequestBuilder);
|
||||
|
||||
Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder);
|
||||
Stream<SearchDocument> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder);
|
||||
|
||||
Optional<SearchResponse> search(Consumer<SearchRequestBuilder> searchRequestBuilder);
|
||||
Optional<SearchResult> search(Consumer<SearchRequestBuilder> searchRequestBuilder);
|
||||
|
||||
Stream<SearchHit> search(Consumer<SearchRequestBuilder> searchRequestBuilder,
|
||||
Stream<SearchDocument> search(Consumer<SearchRequestBuilder> searchRequestBuilder,
|
||||
TimeValue scrollTime, int scrollSize);
|
||||
|
||||
Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder);
|
||||
|
|
|
@ -282,10 +282,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
if (oldAliasMap == null || !oldAliasMap.containsKey(additionalAlias)) {
|
||||
// index alias adder only active on extra aliases, and if alias is new
|
||||
if (adder != null) {
|
||||
adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias);
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName)
|
||||
.alias(additionalAlias)
|
||||
.filter(adder.addAliasOnField(fullIndexName, additionalAlias)));
|
||||
} else {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(additionalAlias));
|
||||
.index(fullIndexName)
|
||||
.alias(additionalAlias));
|
||||
}
|
||||
newAliases.add(additionalAlias);
|
||||
} else {
|
||||
|
|
|
@ -18,9 +18,10 @@ import org.elasticsearch.action.search.SearchScrollRequestBuilder;
|
|||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.xbib.elx.api.SearchClient;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import org.xbib.elx.api.SearchMetric;
|
||||
import org.xbib.elx.api.SearchResult;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -72,7 +73,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
|
||||
public Optional<SearchDocument> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
|
||||
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
|
||||
getRequestBuilderConsumer.accept(getRequestBuilder);
|
||||
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
|
||||
|
@ -94,24 +95,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
}
|
||||
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty();
|
||||
return getResponse.isExists() ? Optional.of(new GetDocument(getResponse)) : Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) {
|
||||
public Stream<SearchDocument> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) {
|
||||
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
|
||||
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
|
||||
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
}
|
||||
MultiGetResponse multiGetItemResponse = actionFuture.actionGet();
|
||||
MultiGetResponse multiGetResponse = actionFuture.actionGet();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
}
|
||||
boolean isempty = multiGetItemResponse.getResponses().length == 0;
|
||||
boolean isempty = multiGetResponse.getResponses().length == 0;
|
||||
if (isempty) {
|
||||
if (searchMetric != null) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
|
@ -121,12 +122,16 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse);
|
||||
return isempty ? Stream.of() : Arrays.stream(multiGetResponse.getResponses())
|
||||
.filter(r -> !r.isFailed())
|
||||
.map(MultiGetDocument::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SearchResponse> search(Consumer<SearchRequestBuilder> queryBuilder) {
|
||||
public Optional<SearchResult> search(Consumer<SearchRequestBuilder> queryBuilder) {
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
|
||||
searchRequestBuilder.setTrackTotalHits(true);
|
||||
searchRequestBuilder.setTrackScores(true);
|
||||
queryBuilder.accept(searchRequestBuilder);
|
||||
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
|
||||
if (searchMetric != null) {
|
||||
|
@ -158,13 +163,17 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(searchResponse);
|
||||
return isempty ?
|
||||
Optional.empty() :
|
||||
Optional.of(new DefaultSearchResult(searchResponse.getHits()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<SearchHit> search(Consumer<SearchRequestBuilder> queryBuilder,
|
||||
public Stream<SearchDocument> search(Consumer<SearchRequestBuilder> queryBuilder,
|
||||
TimeValue scrollTime, int scrollSize) {
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
|
||||
searchRequestBuilder.setTrackTotalHits(true);
|
||||
searchRequestBuilder.setTrackScores(true);
|
||||
queryBuilder.accept(searchRequestBuilder);
|
||||
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
|
||||
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
|
||||
|
@ -239,12 +248,12 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(),
|
||||
condition, lastAction), false)
|
||||
.onClose(responseStream::close)
|
||||
.flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
|
||||
.flatMap(searchResponse -> new DefaultSearchResult(searchResponse.getHits()).getDocuments().stream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder) {
|
||||
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId);
|
||||
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchDocument::getId);
|
||||
}
|
||||
|
||||
private static class TakeWhileSpliterator<T> implements Spliterator<T> {
|
||||
|
|
|
@ -71,9 +71,6 @@ public class DefaultBulkListener implements BulkListener {
|
|||
}
|
||||
int n = 0;
|
||||
for (BulkItemResponse itemResponse : response.getItems()) {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
}
|
||||
if (itemResponse.isFailed()) {
|
||||
n++;
|
||||
if (bulkMetric != null) {
|
||||
|
|
|
@ -30,6 +30,6 @@ public class DefaultSearchDocument implements SearchDocument {
|
|||
|
||||
@Override
|
||||
public Map<String, Object> getFields() {
|
||||
return searchHit.sourceAsMap();
|
||||
return searchHit.getSourceAsMap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ public class DefaultSearchResult implements SearchResult {
|
|||
}
|
||||
@Override
|
||||
public long getTotal() {
|
||||
return searchHits.getTotalHits();
|
||||
return searchHits.getTotalHits().value;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import java.util.Map;
|
||||
|
||||
public class GetDocument implements SearchDocument {
|
||||
|
||||
private final GetResponse getResponse;
|
||||
|
||||
public GetDocument(GetResponse getResponse) {
|
||||
this.getResponse = getResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIndex() {
|
||||
return getResponse.getIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return getResponse.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getScore() {
|
||||
return -1f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields() {
|
||||
return getResponse.getSourceAsMap();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import java.util.Map;
|
||||
|
||||
public class MultiGetDocument implements SearchDocument {
|
||||
|
||||
private final MultiGetItemResponse getResponse;
|
||||
|
||||
public MultiGetDocument(MultiGetItemResponse getResponse) {
|
||||
this.getResponse = getResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIndex() {
|
||||
return getResponse.getResponse().getIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return getResponse.getResponse().getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getScore() {
|
||||
return -1f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields() {
|
||||
return getResponse.getResponse().getSourceAsMap();
|
||||
}
|
||||
}
|
|
@ -57,7 +57,7 @@ class SearchTest {
|
|||
.setIndices("pages")
|
||||
.setQuery(queryStringBuilder)
|
||||
.addSort("rowcount", SortOrder.DESC)
|
||||
.setFrom(i * 10)
|
||||
.setFrom(0)
|
||||
.setSize(10);
|
||||
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
|
||||
assertTrue(searchResponse.getHits().getTotalHits().value > 0);
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package org.xbib.elx.http.action.get;
|
||||
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.get.MultiGetAction;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.xbib.elx.http.HttpAction;
|
||||
import org.xbib.netty.http.client.api.Request;
|
||||
import org.xbib.netty.http.common.HttpResponse;
|
||||
import java.io.IOException;
|
||||
|
||||
public class HttpMultiGetAction extends HttpAction<MultiGetRequest, MultiGetResponse> {
|
||||
|
||||
@Override
|
||||
public ActionType<MultiGetResponse> getActionInstance() {
|
||||
return MultiGetAction.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request.Builder createHttpRequest(String url, MultiGetRequest request) throws IOException {
|
||||
BytesReference source = XContentHelper.toXContent(request, XContentType.JSON, false);
|
||||
return newGetRequest(url, "/_mget", source);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CheckedFunction<XContentParser, MultiGetResponse, IOException> entityParser(HttpResponse httpResponse) {
|
||||
return MultiGetResponse::fromXContent;
|
||||
}
|
||||
}
|
|
@ -20,3 +20,4 @@ org.xbib.elx.http.action.search.HttpSearchScrollAction
|
|||
org.xbib.elx.http.action.main.HttpMainAction
|
||||
org.xbib.elx.http.action.get.HttpExistsAction
|
||||
org.xbib.elx.http.action.get.HttpGetAction
|
||||
org.xbib.elx.http.action.get.HttpMultiGetAction
|
|
@ -3,7 +3,6 @@ package org.xbib.elx.http.test;
|
|||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -83,9 +82,7 @@ class IndexShiftTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexDefinition.setShift(true);
|
||||
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
);
|
||||
(index, alias) -> QueryBuilders.termQuery("my_key", alias));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("d"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("e"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("f"));
|
||||
|
|
|
@ -6,18 +6,20 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.http.HttpBulkClient;
|
||||
import org.xbib.elx.http.HttpBulkClientProvider;
|
||||
import org.xbib.elx.http.HttpSearchClient;
|
||||
import org.xbib.elx.http.HttpSearchClientProvider;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
|
@ -61,27 +63,48 @@ class SearchTest {
|
|||
.setSearchClientProvider(HttpSearchClientProvider.class)
|
||||
.put(helper.getClientSettings())
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
Stream<SearchDocument> stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMillis(100), 579);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMillis(10), 79);
|
||||
final AtomicInteger hitcount = new AtomicInteger();
|
||||
stream.forEach(hit -> hitcount.incrementAndGet());
|
||||
assertEquals(numactions, hitcount.get());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
List<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery())).collect(Collectors.toList());
|
||||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> {
|
||||
idcount.incrementAndGet();
|
||||
});
|
||||
ids.forEach(id -> idcount.incrementAndGet());
|
||||
assertEquals(numactions, idcount.get());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(275, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
}
|
||||
stream = searchClient.multiGet(mgrb -> {
|
||||
for (String id : ids) {
|
||||
mgrb.add(indexDefinition.getFullIndexName(), indexDefinition.getType(), id);
|
||||
}
|
||||
});
|
||||
assertEquals(numactions, stream.count());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package org.xbib.elx.node.test;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -83,9 +82,7 @@ class IndexShiftTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexDefinition.setShift(true);
|
||||
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
);
|
||||
(index, alias) -> QueryBuilders.termQuery("my_key", alias));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("d"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("e"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("f"));
|
||||
|
|
|
@ -6,10 +6,10 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
|
@ -17,8 +17,10 @@ import org.xbib.elx.node.NodeBulkClientProvider;
|
|||
import org.xbib.elx.node.NodeSearchClient;
|
||||
import org.xbib.elx.node.NodeSearchClientProvider;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
|
@ -64,8 +66,7 @@ class SearchTest {
|
|||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||
.put(helper.getClientSettings())
|
||||
.build()) {
|
||||
// test stream count
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
Stream<SearchDocument> stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMillis(100), 579);
|
||||
|
@ -76,7 +77,6 @@ class SearchTest {
|
|||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream docs
|
||||
stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
|
@ -89,10 +89,9 @@ class SearchTest {
|
|||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream doc ids
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
List<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
.setQuery(QueryBuilders.matchAllQuery())).collect(Collectors.toList());
|
||||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> idcount.incrementAndGet());
|
||||
assertEquals(numactions, idcount.get());
|
||||
|
@ -103,6 +102,12 @@ class SearchTest {
|
|||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
}
|
||||
stream = searchClient.multiGet(mgrb -> {
|
||||
for (String id : ids) {
|
||||
mgrb.add(indexDefinition.getFullIndexName(), indexDefinition.getType(), id);
|
||||
}
|
||||
});
|
||||
assertEquals(numactions, stream.count());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package org.xbib.elx.transport.test;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -80,9 +79,7 @@ class IndexShiftTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexDefinition.setShift(true);
|
||||
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
);
|
||||
(index, alias) -> QueryBuilders.termQuery("my_key", alias));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("d"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("e"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("f"));
|
||||
|
|
|
@ -6,10 +6,10 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.api.SearchDocument;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
|
@ -17,8 +17,10 @@ import org.xbib.elx.transport.TransportBulkClientProvider;
|
|||
import org.xbib.elx.transport.TransportSearchClient;
|
||||
import org.xbib.elx.transport.TransportSearchClientProvider;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
|
@ -63,8 +65,7 @@ class SearchTest {
|
|||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
.put(helper.getClientSettings())
|
||||
.build()) {
|
||||
// test stream count
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
Stream<SearchDocument> stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMillis(100), 579);
|
||||
|
@ -75,7 +76,6 @@ class SearchTest {
|
|||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream docs
|
||||
stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
|
@ -89,9 +89,9 @@ class SearchTest {
|
|||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream doc ids
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
List<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
.setQuery(QueryBuilders.matchAllQuery())).collect(Collectors.toList());
|
||||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> idcount.incrementAndGet());
|
||||
assertEquals(numactions, idcount.get());
|
||||
|
@ -102,6 +102,12 @@ class SearchTest {
|
|||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
}
|
||||
stream = searchClient.multiGet(mgrb -> {
|
||||
for (String id : ids) {
|
||||
mgrb.add(indexDefinition.getFullIndexName(), indexDefinition.getType(), id);
|
||||
}
|
||||
});
|
||||
assertEquals(numactions, stream.count());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 7.10.2.12
|
||||
version = 7.10.2.13
|
||||
|
||||
gradle.wrapper.version = 6.6.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
xbib-metrics.version = 2.2.0
|
||||
xbib-time.version = 2.1.0
|
||||
elasticsearch.version = 7.10.2
|
||||
# ES 7.10.2 uses Jackson 2.10.4
|
||||
|
|
Loading…
Reference in a new issue