diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index cdd05e5..6188d3e 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { - void init(Settings settings); + boolean init(Settings settings, String info); void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 74df713..a678e1b 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -30,4 +30,6 @@ public interface BulkProcessor extends Closeable, Flushable { void setMaxBulkVolume(long bulkSize); long getMaxBulkVolume(); + + boolean isClosed(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexAliasAdder.java b/elx-api/src/main/java/org/xbib/elx/api/IndexAliasAdder.java index 4e2dca9..fadce63 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexAliasAdder.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexAliasAdder.java @@ -1,9 +1,9 @@ package org.xbib.elx.api; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.index.query.QueryBuilder; @FunctionalInterface public interface IndexAliasAdder { - void addIndexAlias(IndicesAliasesRequest request, String index, String alias); + QueryBuilder addAliasOnField(String index, String alias); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java index 666d56d..61fd4d6 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java @@ -1,13 +1,9 @@ package org.xbib.elx.api; import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Stream; @@ -18,14 +14,14 @@ public interface SearchClient extends BasicClient { SearchMetric getSearchMetric(); - Optional get(Consumer getRequestBuilder); + Optional get(Consumer getRequestBuilder); - Optional multiGet(Consumer multiGetRequestBuilder); + Stream multiGet(Consumer multiGetRequestBuilder); - Optional search(Consumer searchRequestBuilder); + Optional search(Consumer searchRequestBuilder); - Stream search(Consumer searchRequestBuilder, - TimeValue scrollTime, int scrollSize); + Stream search(Consumer searchRequestBuilder, + TimeValue scrollTime, int scrollSize); Stream getIds(Consumer queryBuilder); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java index e190a51..ccc1ed8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java @@ -30,4 +30,6 @@ public interface SearchMetric extends Closeable { void start(); void stop(); + + boolean isClosed(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchResult.java b/elx-api/src/main/java/org/xbib/elx/api/SearchResult.java index 7ea0dd1..e240d50 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchResult.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchResult.java @@ -1,10 +1,16 @@ package org.xbib.elx.api; +import org.elasticsearch.search.aggregations.Aggregations; + import java.util.List; public interface SearchResult { long getTotal(); + long getTook(); + List getDocuments(); + + Aggregations getAggregations(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index e9a2789..50693ba 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -279,7 +279,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (oldAliasMap == null || !oldAliasMap.containsKey(additionalAlias)) { // index alias adder only active on extra aliases, and if alias is new if (adder != null) { - adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, additionalAlias).filter(adder.addAliasOnField(fullIndexName, additionalAlias))); } else { indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, fullIndexName, additionalAlias)); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 25e8699..a7448bf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -4,6 +4,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -50,11 +51,10 @@ public abstract class AbstractBasicClient implements BasicClient { private final ScheduledExecutorService executorService; - private final AtomicBoolean closed; + protected final AtomicBoolean closed; public AbstractBasicClient() { - this.executorService = Executors.newScheduledThreadPool(2, - new DaemonThreadFactory("elx")); + this.executorService = Executors.newScheduledThreadPool(2, new DaemonThreadFactory("elx")); closed = new AtomicBoolean(false); } @@ -74,19 +74,31 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void init(Settings settings) { - this.settings = settings; + public boolean init(Settings settings, String infoString) { if (closed.compareAndSet(false, true)) { + this.settings = settings; + logger.log(Level.INFO, String.format("Elx: %s on %s %s %s Java: %s %s %s %s ES: %s %s", + System.getProperty("user.name"), + System.getProperty("os.name"), + System.getProperty("os.arch"), + System.getProperty("os.version"), + System.getProperty("java.version"), + System.getProperty("java.vm.version"), + System.getProperty("java.vm.vendor"), + System.getProperty("java.vm.name"), + Version.CURRENT, + infoString)); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); setClient(createClient(settings)); + return true; } + return false; } @Override public void close() throws IOException { - ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { - if (executorService != null) { + if (!executorService.isShutdown()) { executorService.shutdownNow(); } closeClient(settings); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 3798ee3..05f788d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -23,9 +23,7 @@ import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient { @@ -33,19 +31,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private BulkProcessor bulkProcessor; - private final AtomicBoolean closed; - public AbstractBulkClient() { super(); - closed = new AtomicBoolean(true); } @Override - public void init(Settings settings) { - if (closed.compareAndSet(true, false)) { - super.init(settings); + public boolean init(Settings settings, String info) { + if (super.init(settings, info)) { bulkProcessor = new DefaultBulkProcessor(this, settings); + return true; } + return false; } @Override @@ -62,15 +58,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void close() throws IOException { - if (closed.compareAndSet(false, true)) { + if (!bulkProcessor.isClosed()) { + logger.info("closing bulk processor"); ensureClientIsPresent(); - if (bulkProcessor != null) { - logger.info("closing bulk processor"); - bulkProcessor.close(); - } - closeClient(settings); - super.close(); + bulkProcessor.close(); } + closeClient(settings); + super.close(); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index f77c7d3..c6e4f71 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -18,9 +18,11 @@ import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; import org.xbib.elx.api.SearchClient; +import org.xbib.elx.api.SearchDocument; import org.xbib.elx.api.SearchMetric; +import org.xbib.elx.api.SearchResult; + import java.io.IOException; import java.util.Arrays; import java.util.Comparator; @@ -34,34 +36,30 @@ import java.util.stream.StreamSupport; public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { - private final AtomicBoolean closed; - private SearchMetric searchMetric; public AbstractSearchClient() { super(); - this.closed = new AtomicBoolean(true); } @Override - public void init(Settings settings) { - if (closed.compareAndSet(true, false)) { - super.init(settings); + public boolean init(Settings settings, String info) { + if (super.init(settings, info)) { if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(), Parameters.SEARCH_METRIC_ENABLED.getBoolean())) { this.searchMetric = new DefaultSearchMetric(this, settings); searchMetric.init(settings); } + return true; } + return false; } @Override public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - super.close(); - if (searchMetric != null) { - searchMetric.close(); - } + super.close(); + if (searchMetric != null && !searchMetric.isClosed()) { + searchMetric.close(); } } @@ -76,7 +74,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement } @Override - public Optional get(Consumer getRequestBuilderConsumer) { + public Optional get(Consumer getRequestBuilderConsumer) { GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); getRequestBuilderConsumer.accept(getRequestBuilder); ActionFuture actionFuture = getRequestBuilder.execute(); @@ -98,24 +96,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getEmptyQueries().inc(); } } - return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); + return getResponse.isExists() ? Optional.of(new GetDocument(getResponse)) : Optional.empty(); } @Override - public Optional multiGet(Consumer multiGetRequestBuilderConsumer) { + public Stream multiGet(Consumer multiGetRequestBuilderConsumer) { MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); ActionFuture actionFuture = multiGetRequestBuilder.execute(); if (searchMetric != null) { searchMetric.getCurrentQueries().inc(); } - MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); + MultiGetResponse multiGetResponse = actionFuture.actionGet(); if (searchMetric != null) { searchMetric.getCurrentQueries().dec(); searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); } - boolean isempty = multiGetItemResponse.getResponses().length == 0; + boolean isempty = multiGetResponse.getResponses().length == 0; if (isempty) { if (searchMetric != null) { searchMetric.getEmptyQueries().inc(); @@ -125,11 +123,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getSucceededQueries().inc(); } } - return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); + return isempty ? Stream.of() : Arrays.stream(multiGetResponse.getResponses()) + .filter(r -> !r.isFailed()) + .map(MultiGetDocument::new); } @Override - public Optional search(Consumer queryBuilder) { + public Optional search(Consumer queryBuilder) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); ActionFuture actionFuture = searchRequestBuilder.execute(); @@ -162,12 +162,16 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getSucceededQueries().inc(); } } - return isempty ? Optional.empty() : Optional.of(searchResponse); + return isempty ? + Optional.empty() : + Optional.of(new DefaultSearchResult(searchResponse.getHits(), + searchResponse.getAggregations(), + searchResponse.getTook().millis())); } @Override - public Stream search(Consumer queryBuilder, - TimeValue scrollTime, int scrollSize) { + public Stream search(Consumer queryBuilder, + TimeValue scrollTime, int scrollSize) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); @@ -229,12 +233,15 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(), condition, lastAction), false) .onClose(responseStream::close) - .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); + .flatMap(searchResponse -> + new DefaultSearchResult(searchResponse.getHits(), + searchResponse.getAggregations(), + searchResponse.getTook().millis()).getDocuments().stream()); } @Override public Stream getIds(Consumer queryBuilder) { - return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId); + return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchDocument::getId); } private static class TakeWhileSpliterator implements Spliterator { diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 581f04f..5214dc7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -145,13 +145,12 @@ public class ClientBuilder { @SuppressWarnings("unchecked") public C build() throws IOException { Settings settings = settingsBuilder.build(); - logger.log(Level.INFO, "settings = " + settings.toDelimitedString(',')); if (adminClientProvider != null) { for (AdminClientProvider provider : ServiceLoader.load(AdminClientProvider.class, classLoader)) { if (provider.getClass().isAssignableFrom(adminClientProvider)) { C c = (C) provider.getClient(); c.setClient(client); - c.init(settings); + c.init(settings, null); return c; } } @@ -161,7 +160,7 @@ public class ClientBuilder { if (provider.getClass().isAssignableFrom(bulkClientProvider)) { C c = (C) provider.getClient(); c.setClient(client); - c.init(settings); + c.init(settings, null); return c; } } @@ -171,7 +170,7 @@ public class ClientBuilder { if (provider.getClass().isAssignableFrom(searchClientProvider)) { C c = (C) provider.getClient(); c.setClient(client); - c.init(settings); + c.init(settings, null); return c; } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 68d3170..23fed98 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -71,9 +71,6 @@ public class DefaultBulkListener implements BulkListener { } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); - } if (itemResponse.isFailed()) { n++; if (bulkMetric != null) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index e02a316..60d517e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -60,6 +60,8 @@ public class DefaultBulkProcessor implements BulkProcessor { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { this.bulkClient = bulkClient; + this.closed = new AtomicBoolean(false); + this.enabled = new AtomicBoolean(false); int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), @@ -82,17 +84,13 @@ public class DefaultBulkProcessor implements BulkProcessor { this.bulkVolume = maxVolumePerRequest.getBytes(); } this.bulkRequest = new BulkRequest(); - this.closed = new AtomicBoolean(false); - this.enabled = new AtomicBoolean(false); this.executionIdGen = new AtomicLong(); this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger()); if (permits < 1) { throw new IllegalArgumentException("must not be less 1 permits for bulk indexing"); } this.semaphore = new ResizeableSemaphore(permits); - if (logger.isInfoEnabled()) { - logger.info("bulk processor now active"); - } + logger.info("bulk processor now active"); setEnabled(true); } @@ -121,6 +119,11 @@ public class DefaultBulkProcessor implements BulkProcessor { return bulkVolume; } + @Override + public boolean isClosed() { + return closed.get(); + } + @Override public ScheduledExecutorService getScheduler() { return bulkClient.getScheduler(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 7611e1b..24cff43 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -12,6 +12,7 @@ import org.xbib.metrics.common.CountMetric; import org.xbib.metrics.common.Meter; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class DefaultSearchMetric implements SearchMetric { @@ -37,8 +38,11 @@ public class DefaultSearchMetric implements SearchMetric { private Long stopped; + private final AtomicBoolean closed; + public DefaultSearchMetric(SearchClient searchClient, Settings settings) { + this.closed = new AtomicBoolean(true); totalQuery = new Meter(searchClient.getScheduler()); currentQuery = new CountMetric(); queries = new CountMetric(); @@ -55,7 +59,9 @@ public class DefaultSearchMetric implements SearchMetric { @Override public void init(Settings settings) { - start(); + if (closed.compareAndSet(true, false)) { + start(); + } } @Override @@ -117,10 +123,17 @@ public class DefaultSearchMetric implements SearchMetric { this.future.cancel(true); } + @Override + public boolean isClosed() { + return closed.get(); + } + @Override public void close() { - stop(); - totalQuery.shutdown(); + if (closed.compareAndSet(false, true)) { + stop(); + totalQuery.shutdown(); + } } private void log() { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchResult.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchResult.java index c445085..7626c6e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchResult.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchResult.java @@ -2,6 +2,7 @@ package org.xbib.elx.common; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; import org.xbib.elx.api.SearchDocument; import org.xbib.elx.api.SearchResult; @@ -12,12 +13,16 @@ public class DefaultSearchResult implements SearchResult { private final SearchHits searchHits; - public DefaultSearchResult(SearchHits searchHits) { + private final Aggregations aggregations; + + private final long took; + + public DefaultSearchResult(SearchHits searchHits, + Aggregations aggregations, + long took) { this.searchHits = searchHits; - } - @Override - public long getTotal() { - return searchHits.getTotalHits(); + this.aggregations = aggregations; + this.took = took; } @Override @@ -28,4 +33,19 @@ public class DefaultSearchResult implements SearchResult { } return list; } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public long getTotal() { + return searchHits.getTotalHits(); + } + + @Override + public long getTook() { + return took; + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/GetDocument.java b/elx-common/src/main/java/org/xbib/elx/common/GetDocument.java new file mode 100644 index 0000000..4d47d6b --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/GetDocument.java @@ -0,0 +1,34 @@ +package org.xbib.elx.common; + +import org.elasticsearch.action.get.GetResponse; +import org.xbib.elx.api.SearchDocument; +import java.util.Map; + +public class GetDocument implements SearchDocument { + + private final GetResponse getResponse; + + public GetDocument(GetResponse getResponse) { + this.getResponse = getResponse; + } + + @Override + public String getIndex() { + return getResponse.getIndex(); + } + + @Override + public String getId() { + return getResponse.getId(); + } + + @Override + public float getScore() { + return -1f; + } + + @Override + public Map getFields() { + return getResponse.getSourceAsMap(); + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index 66619fd..dc31d62 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -14,7 +14,8 @@ public class MockAdminClient extends AbstractAdminClient { } @Override - public void init(Settings settings) { + public boolean init(Settings settings, String info) { + return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index 0856fe5..63cd3a0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -18,7 +18,8 @@ public class MockBulkClient extends AbstractBulkClient { } @Override - public void init(Settings settings) { + public boolean init(Settings settings, String info) { + return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index 82be6ef..c8e8cce 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -14,7 +14,8 @@ public class MockSearchClient extends AbstractSearchClient { } @Override - public void init(Settings settings) { + public boolean init(Settings settings, String info) { + return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MultiGetDocument.java b/elx-common/src/main/java/org/xbib/elx/common/MultiGetDocument.java new file mode 100644 index 0000000..346fabc --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/MultiGetDocument.java @@ -0,0 +1,34 @@ +package org.xbib.elx.common; + +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.xbib.elx.api.SearchDocument; +import java.util.Map; + +public class MultiGetDocument implements SearchDocument { + + private final MultiGetItemResponse getResponse; + + public MultiGetDocument(MultiGetItemResponse getResponse) { + this.getResponse = getResponse; + } + + @Override + public String getIndex() { + return getResponse.getResponse().getIndex(); + } + + @Override + public String getId() { + return getResponse.getResponse().getId(); + } + + @Override + public float getScore() { + return -1f; + } + + @Override + public Map getFields() { + return getResponse.getResponse().getSourceAsMap(); + } +} diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java index 0aaefbf..bee677a 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java @@ -52,11 +52,9 @@ class AliasTest { new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); indicesAliasesRequest.addAliasAction(aliasAction); client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); - // get alias GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY); long t0 = System.nanoTime(); - GetAliasesResponse getAliasesResponse = - client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); + GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); long t1 = (System.nanoTime() - t0) / 1000000; logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); assertTrue(t1 >= 0); diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java index 757f1af..e0a709a 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java @@ -17,6 +17,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; + import static org.junit.jupiter.api.Assertions.assertEquals; @ExtendWith(TestExtension.class) diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 9546d7c..01ac28d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -2,8 +2,6 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -82,9 +80,7 @@ class IndexShiftTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), - (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, - index, alias).filter(QueryBuilders.termQuery("my_key", alias))) - ); + (index, alias) -> QueryBuilders.termQuery("my_key", alias)); assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("e")); assertTrue(indexShiftResult.getNewAliases().contains("f")); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index e4e8507..620ea2e 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -7,10 +7,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.api.SearchDocument; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.node.NodeBulkClient; @@ -64,8 +64,7 @@ class SearchTest { .setSearchClientProvider(NodeSearchClientProvider.class) .put(helper.getClientSettings()) .build()) { - // test stream count - Stream stream = searchClient.search(qb -> qb + Stream stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 579); diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index ff5a88b..72b7c89 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -3,6 +3,7 @@ package org.xbib.elx.transport; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; +import org.jboss.netty.util.Version; import org.xbib.elx.common.AbstractAdminClient; /** @@ -23,9 +24,12 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public void init(Settings settings) { - super.init(settings); - helper.init((TransportClient) getClient(), settings); + public boolean init(Settings settings, String info) { + if (super.init(settings, "Netty: " + Version.ID)) { + helper.init((TransportClient) getClient(), settings); + return true; + } + return false; } @Override diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index f7f9166..75fa88a 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -23,9 +23,12 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public void init(Settings settings) { - super.init(settings); - helper.init((TransportClient) getClient(), settings); + public boolean init(Settings settings, String info) { + if (super.init(settings, info)) { + helper.init((TransportClient) getClient(), settings); + return true; + } + return false; } @Override diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index eb39534..60b9d0a 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -23,9 +23,12 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public void init(Settings settings) { - super.init(settings); - helper.init((TransportClient) getClient(), settings); + public boolean init(Settings settings, String info) { + if (super.init(settings, info)) { + helper.init((TransportClient) getClient(), settings); + return true; + } + return false; } @Override diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 8f87880..aa938c7 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -2,8 +2,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -82,9 +80,7 @@ class IndexShiftTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), - (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, - index, alias).filter(QueryBuilders.termQuery("my_key", alias))) - ); + (index, alias) -> QueryBuilders.termQuery("my_key", alias)); assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("e")); assertTrue(indexShiftResult.getNewAliases().contains("f")); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 7d7a926..6b61368 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -4,10 +4,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.api.SearchDocument; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.transport.TransportBulkClient; @@ -67,7 +67,7 @@ class SearchTest { .put(helper.getClientSettings()) .build()) { // test stream count - Stream stream = searchClient.search(qb -> qb + Stream stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 579); diff --git a/gradle.properties b/gradle.properties index f0ad23f..2cf1ceb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,13 +1,13 @@ group = org.xbib name = elx -version = 2.2.1.49 +version = 2.2.1.50 gradle.wrapper.version = 6.6.1 -xbib-metrics.version = 2.1.0 +xbib-metrics.version = 2.2.0 xbib-time.version = 2.1.0 xbib-guice.version = 4.4.2 -xbib-guava.version = 28.1 -xbib-netty-http.version = 4.1.63.4 +xbib-guava.version = 30.1 +xbib-netty-http.version = 4.1.65.0 elasticsearch.version = 2.2.1 jackson.version = 2.11.4 jna.version = 5.8.0