align with es7102, introduce search document API

This commit is contained in:
Jörg Prante 2021-06-08 18:09:37 +02:00
parent 5f8dd4d2fe
commit fe6434167c
30 changed files with 236 additions and 113 deletions

View file

@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable { public interface BasicClient extends Closeable {
void init(Settings settings); boolean init(Settings settings, String info);
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);

View file

@ -30,4 +30,6 @@ public interface BulkProcessor extends Closeable, Flushable {
void setMaxBulkVolume(long bulkSize); void setMaxBulkVolume(long bulkSize);
long getMaxBulkVolume(); long getMaxBulkVolume();
boolean isClosed();
} }

View file

@ -1,9 +1,9 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.index.query.QueryBuilder;
@FunctionalInterface @FunctionalInterface
public interface IndexAliasAdder { public interface IndexAliasAdder {
void addIndexAlias(IndicesAliasesRequest request, String index, String alias); QueryBuilder addAliasOnField(String index, String alias);
} }

View file

@ -1,13 +1,9 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -18,14 +14,14 @@ public interface SearchClient extends BasicClient {
SearchMetric getSearchMetric(); SearchMetric getSearchMetric();
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder); Optional<SearchDocument> get(Consumer<GetRequestBuilder> getRequestBuilder);
Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder); Stream<SearchDocument> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder);
Optional<SearchResponse> search(Consumer<SearchRequestBuilder> searchRequestBuilder); Optional<SearchResult> search(Consumer<SearchRequestBuilder> searchRequestBuilder);
Stream<SearchHit> search(Consumer<SearchRequestBuilder> searchRequestBuilder, Stream<SearchDocument> search(Consumer<SearchRequestBuilder> searchRequestBuilder,
TimeValue scrollTime, int scrollSize); TimeValue scrollTime, int scrollSize);
Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder); Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder);
} }

View file

@ -30,4 +30,6 @@ public interface SearchMetric extends Closeable {
void start(); void start();
void stop(); void stop();
boolean isClosed();
} }

View file

@ -1,10 +1,16 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import org.elasticsearch.search.aggregations.Aggregations;
import java.util.List; import java.util.List;
public interface SearchResult { public interface SearchResult {
long getTotal(); long getTotal();
long getTook();
List<SearchDocument> getDocuments(); List<SearchDocument> getDocuments();
Aggregations getAggregations();
} }

View file

@ -279,7 +279,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (oldAliasMap == null || !oldAliasMap.containsKey(additionalAlias)) { if (oldAliasMap == null || !oldAliasMap.containsKey(additionalAlias)) {
// index alias adder only active on extra aliases, and if alias is new // index alias adder only active on extra aliases, and if alias is new
if (adder != null) { if (adder != null) {
adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias); indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, additionalAlias).filter(adder.addAliasOnField(fullIndexName, additionalAlias)));
} else { } else {
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, additionalAlias)); fullIndexName, additionalAlias));

View file

@ -4,6 +4,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -50,11 +51,10 @@ public abstract class AbstractBasicClient implements BasicClient {
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final AtomicBoolean closed; protected final AtomicBoolean closed;
public AbstractBasicClient() { public AbstractBasicClient() {
this.executorService = Executors.newScheduledThreadPool(2, this.executorService = Executors.newScheduledThreadPool(2, new DaemonThreadFactory("elx"));
new DaemonThreadFactory("elx"));
closed = new AtomicBoolean(false); closed = new AtomicBoolean(false);
} }
@ -74,19 +74,31 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String infoString) {
this.settings = settings;
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
this.settings = settings;
logger.log(Level.INFO, String.format("Elx: %s on %s %s %s Java: %s %s %s %s ES: %s %s",
System.getProperty("user.name"),
System.getProperty("os.name"),
System.getProperty("os.arch"),
System.getProperty("os.version"),
System.getProperty("java.version"),
System.getProperty("java.vm.version"),
System.getProperty("java.vm.vendor"),
System.getProperty("java.vm.name"),
Version.CURRENT,
infoString));
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
setClient(createClient(settings)); setClient(createClient(settings));
return true;
} }
return false;
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
if (executorService != null) { if (!executorService.isShutdown()) {
executorService.shutdownNow(); executorService.shutdownNow();
} }
closeClient(settings); closeClient(settings);

View file

@ -23,9 +23,7 @@ import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient { public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient {
@ -33,19 +31,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
private BulkProcessor bulkProcessor; private BulkProcessor bulkProcessor;
private final AtomicBoolean closed;
public AbstractBulkClient() { public AbstractBulkClient() {
super(); super();
closed = new AtomicBoolean(true);
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
if (closed.compareAndSet(true, false)) { if (super.init(settings, info)) {
super.init(settings);
bulkProcessor = new DefaultBulkProcessor(this, settings); bulkProcessor = new DefaultBulkProcessor(this, settings);
return true;
} }
return false;
} }
@Override @Override
@ -62,15 +58,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (!bulkProcessor.isClosed()) {
logger.info("closing bulk processor");
ensureClientIsPresent(); ensureClientIsPresent();
if (bulkProcessor != null) { bulkProcessor.close();
logger.info("closing bulk processor");
bulkProcessor.close();
}
closeClient(settings);
super.close();
} }
closeClient(settings);
super.close();
} }
@Override @Override

View file

@ -18,9 +18,11 @@ import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.xbib.elx.api.SearchClient; import org.xbib.elx.api.SearchClient;
import org.xbib.elx.api.SearchDocument;
import org.xbib.elx.api.SearchMetric; import org.xbib.elx.api.SearchMetric;
import org.xbib.elx.api.SearchResult;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -34,34 +36,30 @@ import java.util.stream.StreamSupport;
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
private final AtomicBoolean closed;
private SearchMetric searchMetric; private SearchMetric searchMetric;
public AbstractSearchClient() { public AbstractSearchClient() {
super(); super();
this.closed = new AtomicBoolean(true);
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
if (closed.compareAndSet(true, false)) { if (super.init(settings, info)) {
super.init(settings);
if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(), if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(),
Parameters.SEARCH_METRIC_ENABLED.getBoolean())) { Parameters.SEARCH_METRIC_ENABLED.getBoolean())) {
this.searchMetric = new DefaultSearchMetric(this, settings); this.searchMetric = new DefaultSearchMetric(this, settings);
searchMetric.init(settings); searchMetric.init(settings);
} }
return true;
} }
return false;
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { super.close();
super.close(); if (searchMetric != null && !searchMetric.isClosed()) {
if (searchMetric != null) { searchMetric.close();
searchMetric.close();
}
} }
} }
@ -76,7 +74,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
} }
@Override @Override
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) { public Optional<SearchDocument> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
getRequestBuilderConsumer.accept(getRequestBuilder); getRequestBuilderConsumer.accept(getRequestBuilder);
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute(); ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
@ -98,24 +96,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
searchMetric.getEmptyQueries().inc(); searchMetric.getEmptyQueries().inc();
} }
} }
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); return getResponse.isExists() ? Optional.of(new GetDocument(getResponse)) : Optional.empty();
} }
@Override @Override
public Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) { public Stream<SearchDocument> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) {
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute(); ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
if (searchMetric != null) { if (searchMetric != null) {
searchMetric.getCurrentQueries().inc(); searchMetric.getCurrentQueries().inc();
} }
MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); MultiGetResponse multiGetResponse = actionFuture.actionGet();
if (searchMetric != null) { if (searchMetric != null) {
searchMetric.getCurrentQueries().dec(); searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc(); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1); searchMetric.markTotalQueries(1);
} }
boolean isempty = multiGetItemResponse.getResponses().length == 0; boolean isempty = multiGetResponse.getResponses().length == 0;
if (isempty) { if (isempty) {
if (searchMetric != null) { if (searchMetric != null) {
searchMetric.getEmptyQueries().inc(); searchMetric.getEmptyQueries().inc();
@ -125,11 +123,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
searchMetric.getSucceededQueries().inc(); searchMetric.getSucceededQueries().inc();
} }
} }
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); return isempty ? Stream.of() : Arrays.stream(multiGetResponse.getResponses())
.filter(r -> !r.isFailed())
.map(MultiGetDocument::new);
} }
@Override @Override
public Optional<SearchResponse> search(Consumer<SearchRequestBuilder> queryBuilder) { public Optional<SearchResult> search(Consumer<SearchRequestBuilder> queryBuilder) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
@ -162,12 +162,16 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
searchMetric.getSucceededQueries().inc(); searchMetric.getSucceededQueries().inc();
} }
} }
return isempty ? Optional.empty() : Optional.of(searchResponse); return isempty ?
Optional.empty() :
Optional.of(new DefaultSearchResult(searchResponse.getHits(),
searchResponse.getAggregations(),
searchResponse.getTook().millis()));
} }
@Override @Override
public Stream<SearchHit> search(Consumer<SearchRequestBuilder> queryBuilder, public Stream<SearchDocument> search(Consumer<SearchRequestBuilder> queryBuilder,
TimeValue scrollTime, int scrollSize) { TimeValue scrollTime, int scrollSize) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
@ -229,12 +233,15 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(), return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(),
condition, lastAction), false) condition, lastAction), false)
.onClose(responseStream::close) .onClose(responseStream::close)
.flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); .flatMap(searchResponse ->
new DefaultSearchResult(searchResponse.getHits(),
searchResponse.getAggregations(),
searchResponse.getTook().millis()).getDocuments().stream());
} }
@Override @Override
public Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder) { public Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder) {
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId); return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchDocument::getId);
} }
private static class TakeWhileSpliterator<T> implements Spliterator<T> { private static class TakeWhileSpliterator<T> implements Spliterator<T> {

View file

@ -145,13 +145,12 @@ public class ClientBuilder {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <C extends BasicClient> C build() throws IOException { public <C extends BasicClient> C build() throws IOException {
Settings settings = settingsBuilder.build(); Settings settings = settingsBuilder.build();
logger.log(Level.INFO, "settings = " + settings.toDelimitedString(','));
if (adminClientProvider != null) { if (adminClientProvider != null) {
for (AdminClientProvider provider : ServiceLoader.load(AdminClientProvider.class, classLoader)) { for (AdminClientProvider provider : ServiceLoader.load(AdminClientProvider.class, classLoader)) {
if (provider.getClass().isAssignableFrom(adminClientProvider)) { if (provider.getClass().isAssignableFrom(adminClientProvider)) {
C c = (C) provider.getClient(); C c = (C) provider.getClient();
c.setClient(client); c.setClient(client);
c.init(settings); c.init(settings, null);
return c; return c;
} }
} }
@ -161,7 +160,7 @@ public class ClientBuilder {
if (provider.getClass().isAssignableFrom(bulkClientProvider)) { if (provider.getClass().isAssignableFrom(bulkClientProvider)) {
C c = (C) provider.getClient(); C c = (C) provider.getClient();
c.setClient(client); c.setClient(client);
c.init(settings); c.init(settings, null);
return c; return c;
} }
} }
@ -171,7 +170,7 @@ public class ClientBuilder {
if (provider.getClass().isAssignableFrom(searchClientProvider)) { if (provider.getClass().isAssignableFrom(searchClientProvider)) {
C c = (C) provider.getClient(); C c = (C) provider.getClient();
c.setClient(client); c.setClient(client);
c.init(settings); c.init(settings, null);
return c; return c;
} }
} }

View file

@ -71,9 +71,6 @@ public class DefaultBulkListener implements BulkListener {
} }
int n = 0; int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) { for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) { if (itemResponse.isFailed()) {
n++; n++;
if (bulkMetric != null) { if (bulkMetric != null) {

View file

@ -60,6 +60,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
this.bulkClient = bulkClient; this.bulkClient = bulkClient;
this.closed = new AtomicBoolean(false);
this.enabled = new AtomicBoolean(false);
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
@ -82,17 +84,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.bulkVolume = maxVolumePerRequest.getBytes(); this.bulkVolume = maxVolumePerRequest.getBytes();
} }
this.bulkRequest = new BulkRequest(); this.bulkRequest = new BulkRequest();
this.closed = new AtomicBoolean(false);
this.enabled = new AtomicBoolean(false);
this.executionIdGen = new AtomicLong(); this.executionIdGen = new AtomicLong();
this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger()); this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger());
if (permits < 1) { if (permits < 1) {
throw new IllegalArgumentException("must not be less 1 permits for bulk indexing"); throw new IllegalArgumentException("must not be less 1 permits for bulk indexing");
} }
this.semaphore = new ResizeableSemaphore(permits); this.semaphore = new ResizeableSemaphore(permits);
if (logger.isInfoEnabled()) { logger.info("bulk processor now active");
logger.info("bulk processor now active");
}
setEnabled(true); setEnabled(true);
} }
@ -121,6 +119,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
return bulkVolume; return bulkVolume;
} }
@Override
public boolean isClosed() {
return closed.get();
}
@Override @Override
public ScheduledExecutorService getScheduler() { public ScheduledExecutorService getScheduler() {
return bulkClient.getScheduler(); return bulkClient.getScheduler();

View file

@ -12,6 +12,7 @@ import org.xbib.metrics.common.CountMetric;
import org.xbib.metrics.common.Meter; import org.xbib.metrics.common.Meter;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class DefaultSearchMetric implements SearchMetric { public class DefaultSearchMetric implements SearchMetric {
@ -37,8 +38,11 @@ public class DefaultSearchMetric implements SearchMetric {
private Long stopped; private Long stopped;
private final AtomicBoolean closed;
public DefaultSearchMetric(SearchClient searchClient, public DefaultSearchMetric(SearchClient searchClient,
Settings settings) { Settings settings) {
this.closed = new AtomicBoolean(true);
totalQuery = new Meter(searchClient.getScheduler()); totalQuery = new Meter(searchClient.getScheduler());
currentQuery = new CountMetric(); currentQuery = new CountMetric();
queries = new CountMetric(); queries = new CountMetric();
@ -55,7 +59,9 @@ public class DefaultSearchMetric implements SearchMetric {
@Override @Override
public void init(Settings settings) { public void init(Settings settings) {
start(); if (closed.compareAndSet(true, false)) {
start();
}
} }
@Override @Override
@ -117,10 +123,17 @@ public class DefaultSearchMetric implements SearchMetric {
this.future.cancel(true); this.future.cancel(true);
} }
@Override
public boolean isClosed() {
return closed.get();
}
@Override @Override
public void close() { public void close() {
stop(); if (closed.compareAndSet(false, true)) {
totalQuery.shutdown(); stop();
totalQuery.shutdown();
}
} }
private void log() { private void log() {

View file

@ -2,6 +2,7 @@ package org.xbib.elx.common;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.xbib.elx.api.SearchDocument; import org.xbib.elx.api.SearchDocument;
import org.xbib.elx.api.SearchResult; import org.xbib.elx.api.SearchResult;
@ -12,12 +13,16 @@ public class DefaultSearchResult implements SearchResult {
private final SearchHits searchHits; private final SearchHits searchHits;
public DefaultSearchResult(SearchHits searchHits) { private final Aggregations aggregations;
private final long took;
public DefaultSearchResult(SearchHits searchHits,
Aggregations aggregations,
long took) {
this.searchHits = searchHits; this.searchHits = searchHits;
} this.aggregations = aggregations;
@Override this.took = took;
public long getTotal() {
return searchHits.getTotalHits();
} }
@Override @Override
@ -28,4 +33,19 @@ public class DefaultSearchResult implements SearchResult {
} }
return list; return list;
} }
@Override
public Aggregations getAggregations() {
return aggregations;
}
@Override
public long getTotal() {
return searchHits.getTotalHits();
}
@Override
public long getTook() {
return took;
}
} }

View file

@ -0,0 +1,34 @@
package org.xbib.elx.common;
import org.elasticsearch.action.get.GetResponse;
import org.xbib.elx.api.SearchDocument;
import java.util.Map;
public class GetDocument implements SearchDocument {
private final GetResponse getResponse;
public GetDocument(GetResponse getResponse) {
this.getResponse = getResponse;
}
@Override
public String getIndex() {
return getResponse.getIndex();
}
@Override
public String getId() {
return getResponse.getId();
}
@Override
public float getScore() {
return -1f;
}
@Override
public Map<String, Object> getFields() {
return getResponse.getSourceAsMap();
}
}

View file

@ -14,7 +14,8 @@ public class MockAdminClient extends AbstractAdminClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
return true;
} }
@Override @Override

View file

@ -18,7 +18,8 @@ public class MockBulkClient extends AbstractBulkClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
return true;
} }
@Override @Override

View file

@ -14,7 +14,8 @@ public class MockSearchClient extends AbstractSearchClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
return true;
} }
@Override @Override

View file

@ -0,0 +1,34 @@
package org.xbib.elx.common;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.xbib.elx.api.SearchDocument;
import java.util.Map;
public class MultiGetDocument implements SearchDocument {
private final MultiGetItemResponse getResponse;
public MultiGetDocument(MultiGetItemResponse getResponse) {
this.getResponse = getResponse;
}
@Override
public String getIndex() {
return getResponse.getResponse().getIndex();
}
@Override
public String getId() {
return getResponse.getResponse().getId();
}
@Override
public float getScore() {
return -1f;
}
@Override
public Map<String, Object> getFields() {
return getResponse.getResponse().getSourceAsMap();
}
}

View file

@ -52,11 +52,9 @@ class AliasTest {
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction); indicesAliasesRequest.addAliasAction(aliasAction);
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY); GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
long t0 = System.nanoTime(); long t0 = System.nanoTime();
GetAliasesResponse getAliasesResponse = GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
long t1 = (System.nanoTime() - t0) / 1000000; long t1 = (System.nanoTime() - t0) / 1000000;
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0); assertTrue(t1 >= 0);

View file

@ -17,6 +17,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(TestExtension.class) @ExtendWith(TestExtension.class)

View file

@ -2,8 +2,6 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -82,9 +80,7 @@ class IndexShiftTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setShift(true); indexDefinition.setShift(true);
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, (index, alias) -> QueryBuilders.termQuery("my_key", alias));
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
);
assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("d"));
assertTrue(indexShiftResult.getNewAliases().contains("e")); assertTrue(indexShiftResult.getNewAliases().contains("e"));
assertTrue(indexShiftResult.getNewAliases().contains("f")); assertTrue(indexShiftResult.getNewAliases().contains("f"));

View file

@ -7,10 +7,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.SearchDocument;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
@ -64,8 +64,7 @@ class SearchTest {
.setSearchClientProvider(NodeSearchClientProvider.class) .setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count Stream<SearchDocument> stream = searchClient.search(qb -> qb
Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMillis(100), 579); TimeValue.timeValueMillis(100), 579);

View file

@ -3,6 +3,7 @@ package org.xbib.elx.transport;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.util.Version;
import org.xbib.elx.common.AbstractAdminClient; import org.xbib.elx.common.AbstractAdminClient;
/** /**
@ -23,9 +24,12 @@ public class TransportAdminClient extends AbstractAdminClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
super.init(settings); if (super.init(settings, "Netty: " + Version.ID)) {
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
} }
@Override @Override

View file

@ -23,9 +23,12 @@ public class TransportBulkClient extends AbstractBulkClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
super.init(settings); if (super.init(settings, info)) {
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
} }
@Override @Override

View file

@ -23,9 +23,12 @@ public class TransportSearchClient extends AbstractSearchClient {
} }
@Override @Override
public void init(Settings settings) { public boolean init(Settings settings, String info) {
super.init(settings); if (super.init(settings, info)) {
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
} }
@Override @Override

View file

@ -2,8 +2,6 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -82,9 +80,7 @@ class IndexShiftTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
indexDefinition.setShift(true); indexDefinition.setShift(true);
indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, (index, alias) -> QueryBuilders.termQuery("my_key", alias));
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
);
assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("d"));
assertTrue(indexShiftResult.getNewAliases().contains("e")); assertTrue(indexShiftResult.getNewAliases().contains("e"));
assertTrue(indexShiftResult.getNewAliases().contains("f")); assertTrue(indexShiftResult.getNewAliases().contains("f"));

View file

@ -4,10 +4,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.SearchDocument;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
@ -67,7 +67,7 @@ class SearchTest {
.put(helper.getClientSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count // test stream count
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchDocument> stream = searchClient.search(qb -> qb
.setIndices(indexDefinition.getFullIndexName()) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMillis(100), 579); TimeValue.timeValueMillis(100), 579);

View file

@ -1,13 +1,13 @@
group = org.xbib group = org.xbib
name = elx name = elx
version = 2.2.1.49 version = 2.2.1.50
gradle.wrapper.version = 6.6.1 gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0 xbib-metrics.version = 2.2.0
xbib-time.version = 2.1.0 xbib-time.version = 2.1.0
xbib-guice.version = 4.4.2 xbib-guice.version = 4.4.2
xbib-guava.version = 28.1 xbib-guava.version = 30.1
xbib-netty-http.version = 4.1.63.4 xbib-netty-http.version = 4.1.65.0
elasticsearch.version = 2.2.1 elasticsearch.version = 2.2.1
jackson.version = 2.11.4 jackson.version = 2.11.4
jna.version = 5.8.0 jna.version = 5.8.0